From e9a575d3c480dc582f2aa277cd40aa5ed272795b Mon Sep 17 00:00:00 2001 From: stealthwhizz Date: Thu, 19 Mar 2026 00:15:35 +0530 Subject: [PATCH 1/5] merge: sync fork with upstream main --- finbot/tools/data/vendor.py | 117 +++++++++++++++++++----------------- 1 file changed, 61 insertions(+), 56 deletions(-) diff --git a/finbot/tools/data/vendor.py b/finbot/tools/data/vendor.py index 7cac2cd3..95566c43 100644 --- a/finbot/tools/data/vendor.py +++ b/finbot/tools/data/vendor.py @@ -4,7 +4,7 @@ from typing import Any from finbot.core.auth.session import SessionContext -from finbot.core.data.database import db_session +from finbot.core.data.database import get_db from finbot.core.data.repositories import VendorRepository logger = logging.getLogger(__name__) @@ -23,12 +23,15 @@ async def get_vendor_details( Dictionary containing vendor details """ logger.info("Getting vendor details for vendor_id: %s", vendor_id) - with db_session() as db: + db = next(get_db()) + try: vendor_repo = VendorRepository(db, session_context) vendor = vendor_repo.get_vendor(vendor_id) if not vendor: raise ValueError("Vendor not found") return vendor.to_dict() + finally: + db.close() async def get_vendor_contact_info( @@ -37,20 +40,20 @@ async def get_vendor_contact_info( ) -> dict[str, Any]: """Get vendor contact information for communication purposes""" logger.info("Getting vendor contact info for vendor_id: %s", vendor_id) - with db_session() as db: - vendor_repo = VendorRepository(db, session_context) - vendor = vendor_repo.get_vendor(vendor_id) - if not vendor: - raise ValueError("Vendor not found") - - return { - "vendor_id": vendor.id, - "company_name": vendor.company_name, - "contact_name": vendor.contact_name, - "email": vendor.email, - "phone": vendor.phone, - "status": vendor.status, - } + db = next(get_db()) + vendor_repo = VendorRepository(db, session_context) + vendor = vendor_repo.get_vendor(vendor_id) + if not vendor: + raise ValueError("Vendor not found") + + return { + "vendor_id": vendor.id, + "company_name": vendor.company_name, + "contact_name": vendor.contact_name, + "email": vendor.email, + "phone": vendor.phone, + "status": vendor.status, + } async def update_vendor_status( @@ -70,32 +73,34 @@ async def update_vendor_status( risk_level, agent_notes, ) - with db_session() as db: - vendor_repo = VendorRepository(db, session_context) - vendor = vendor_repo.get_vendor(vendor_id) - if not vendor: - raise ValueError("Vendor not found") - - previous_state = { - "status": vendor.status, - "trust_level": vendor.trust_level, - "risk_level": vendor.risk_level, - } - - existing_notes = vendor.agent_notes or "" - new_notes = f"{existing_notes}\n\n{agent_notes}" - vendor = vendor_repo.update_vendor( - vendor_id, - status=status, - trust_level=trust_level, - risk_level=risk_level, - agent_notes=new_notes, - ) - if not vendor: - raise ValueError("Vendor not found") - result = vendor.to_dict() - result["_previous_state"] = previous_state - return result + db = next(get_db()) + vendor_repo = VendorRepository(db, session_context) + # append notes to the existing agent_notes + vendor = vendor_repo.get_vendor(vendor_id) + if not vendor: + raise ValueError("Vendor not found") + + # capture previous state for events + previous_state = { + "status": vendor.status, + "trust_level": vendor.trust_level, + "risk_level": vendor.risk_level, + } + + existing_notes = vendor.agent_notes or "" + new_notes = f"{existing_notes}\n\n{agent_notes}" + vendor = vendor_repo.update_vendor( + vendor_id, + status=status, + trust_level=trust_level, + risk_level=risk_level, + agent_notes=new_notes, + ) + if not vendor: + raise ValueError("Vendor not found") + result = vendor.to_dict() + result["_previous_state"] = previous_state + return result async def update_vendor_agent_notes( @@ -109,17 +114,17 @@ async def update_vendor_agent_notes( vendor_id, agent_notes, ) - with db_session() as db: - vendor_repo = VendorRepository(db, session_context) - vendor = vendor_repo.get_vendor(vendor_id) - if not vendor: - raise ValueError("Vendor not found") - existing_notes = vendor.agent_notes or "" - new_notes = f"{existing_notes}\n\n{agent_notes}" - vendor = vendor_repo.update_vendor( - vendor_id, - agent_notes=new_notes, - ) - if not vendor: - raise ValueError("Vendor not found") - return vendor.to_dict() + db = next(get_db()) + vendor_repo = VendorRepository(db, session_context) + vendor = vendor_repo.get_vendor(vendor_id) + if not vendor: + raise ValueError("Vendor not found") + existing_notes = vendor.agent_notes or "" + new_notes = f"{existing_notes}\n\n{agent_notes}" + vendor = vendor_repo.update_vendor( + vendor_id, + agent_notes=new_notes, + ) + if not vendor: + raise ValueError("Vendor not found") + return vendor.to_dict() From c8b4dc343d2094bcc77bcac33ea4eed962ada043 Mon Sep 17 00:00:00 2001 From: stealthwhizz Date: Mon, 25 May 2026 23:40:41 +0530 Subject: [PATCH 2/5] feat(trace): add SequenceDetector primitive skeleton Adds StepSpec dataclass and SequenceDetector base structure to finbot/ctf/detectors/primitives/. Includes config validation, get_relevant_event_types(), and stubbed private helpers for history querying, step matching, and time-window checks. check_event() and all helpers are NotImplementedError stubs pending implementation. --- .../detectors/primitives/sequence_detector.py | 137 ++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 finbot/ctf/detectors/primitives/sequence_detector.py diff --git a/finbot/ctf/detectors/primitives/sequence_detector.py b/finbot/ctf/detectors/primitives/sequence_detector.py new file mode 100644 index 00000000..d4e2a4da --- /dev/null +++ b/finbot/ctf/detectors/primitives/sequence_detector.py @@ -0,0 +1,137 @@ +""" +Sequence Detector Primitive + +Detects multi-step attack patterns across a session or workflow window. +Unlike all existing detectors (which fire on a single event), this primitive +queries CTFEvent history to match an ordered sequence of steps. + +Challenge authors configure sequences in YAML — no Python required. +""" + +import logging +from datetime import datetime +from typing import Any + +from sqlalchemy.orm import Session + +from finbot.ctf.detectors.base import BaseDetector +from finbot.ctf.detectors.result import DetectionResult + +logger = logging.getLogger(__name__) + + +class StepSpec: + """A single step in an attack sequence. + + Attributes: + event_type: Glob pattern matched against CTFEvent.event_type + e.g. "agent.*.tool_call_success" + conditions: Field conditions checked against the event's details dict + e.g. {"tool_name": "approve_invoice"} + label: Human-readable name shown in evidence output + """ + + def __init__(self, event_type: str, conditions: dict[str, Any], label: str): + self.event_type = event_type + self.conditions = conditions + self.label = label + + @classmethod + def from_dict(cls, d: dict[str, Any]) -> "StepSpec": + return cls( + event_type=d["event_type"], + conditions=d.get("conditions", {}), + label=d.get("label", d["event_type"]), + ) + + +class SequenceDetector(BaseDetector): + """Detects multi-step attack patterns across a session window. + + Configuration (set via YAML detector_config): + steps: list[StepSpec] — ordered sequence to match (required) + within_n_events: int — max events between steps (default: unlimited) + within_seconds: int — optional time-based window (default: None) + order_matters: bool — enforce step ordering (default: True) + window: str — "session" | "workflow" (default: "session") + """ + + def _validate_config(self) -> None: + if not self.config.get("steps"): + raise ValueError("SequenceDetector requires at least one step in config") + + window = self.config.get("window", "session") + if window not in ("session", "workflow"): + raise ValueError(f"window must be 'session' or 'workflow', got: {window!r}") + + within_n = self.config.get("within_n_events") + if within_n is not None and (not isinstance(within_n, int) or within_n < 1): + raise ValueError("within_n_events must be a positive integer") + + within_s = self.config.get("within_seconds") + if within_s is not None and (not isinstance(within_s, int) or within_s < 1): + raise ValueError("within_seconds must be a positive integer") + + def _get_steps(self) -> list[StepSpec]: + return [StepSpec.from_dict(s) for s in self.config["steps"]] + + def get_relevant_event_types(self) -> list[str]: + return [step["event_type"] for step in self.config.get("steps", [])] + + async def check_event(self, event: dict[str, Any], db: Session) -> DetectionResult: + """Check if the incoming event completes a configured attack sequence. + + Steps: + 1. Determine the window scope (session_id or workflow_id) + 2. Query CTFEvent history for that scope + 3. Walk configured steps and match against history in order + 4. Fire only if all steps matched within the configured window + """ + raise NotImplementedError("check_event will be implemented in the next step") + + # ------------------------------------------------------------------ + # Private helpers (stubs — to be filled in next) + # ------------------------------------------------------------------ + + def _query_history( + self, db: Session, namespace: str, scope_field: str, scope_value: str + ) -> list[dict[str, Any]]: + """Query CTFEvent history for the window scope, ordered by timestamp asc. + + Args: + db: SQLAlchemy session + namespace: tenant namespace + scope_field: "session_id" or "workflow_id" + scope_value: the actual id value + + Returns: + List of event dicts (details parsed from JSON) ordered oldest → newest + """ + raise NotImplementedError + + def _match_steps( + self, + steps: list[StepSpec], + history: list[dict[str, Any]], + within_n_events: int | None, + within_seconds: int | None, + order_matters: bool, + ) -> tuple[bool, list[dict[str, Any]]]: + """Walk history and try to match all steps. + + Returns: + (matched: bool, evidence_steps: list of matched event dicts) + """ + raise NotImplementedError + + @staticmethod + def _event_matches_step(event: dict[str, Any], step: StepSpec) -> bool: + """Return True if event satisfies the step's event_type glob and conditions.""" + raise NotImplementedError + + @staticmethod + def _within_time_window( + first: datetime, last: datetime, within_seconds: int + ) -> bool: + """Return True if last - first <= within_seconds.""" + raise NotImplementedError From 62fae0fb383b8531acb138785e081e635be025c0 Mon Sep 17 00:00:00 2001 From: stealthwhizz Date: Wed, 3 Jun 2026 01:07:54 +0530 Subject: [PATCH 3/5] feat(trace): add SequenceDetector primitive with unit tests and DB migration - Add SequenceDetector to finbot/ctf/detectors/primitives/ Detects multi-step attack patterns across a session or workflow window. Supports ordered step matching, glob event_type patterns, within_n_events and within_seconds windows, and all ToolCallDetector field operators. Challenge authors configure it from YAML with no Python required. - Add composite index idx_ctf_event_session_ts_type on (session_id, timestamp, event_type) to keep session-window history queries below 10ms p95. - Export SequenceDetector from finbot/ctf/detectors/primitives/__init__.py - Add 17 unit tests covering full sequence detection, partial sequences, order enforcement, session/workflow windows, condition operators, and glob event_type matching. --- finbot/ctf/detectors/primitives/__init__.py | 2 + .../detectors/primitives/sequence_detector.py | 307 +++++++++------ .../2026_06_03_add_ctf_event_session_index.py | 33 ++ tests/unit/ctf/test_sequence_detector.py | 362 ++++++++++++++++++ 4 files changed, 595 insertions(+), 109 deletions(-) create mode 100644 migrations/versions/2026_06_03_add_ctf_event_session_index.py create mode 100644 tests/unit/ctf/test_sequence_detector.py diff --git a/finbot/ctf/detectors/primitives/__init__.py b/finbot/ctf/detectors/primitives/__init__.py index d726a542..41115875 100644 --- a/finbot/ctf/detectors/primitives/__init__.py +++ b/finbot/ctf/detectors/primitives/__init__.py @@ -3,6 +3,7 @@ from finbot.ctf.detectors.primitives.pattern_match import PatternMatchDetector from finbot.ctf.detectors.primitives.pi_jb import PromptInjectionDetector from finbot.ctf.detectors.primitives.pii import PIIDetector +from finbot.ctf.detectors.primitives.sequence_detector import SequenceDetector from finbot.ctf.detectors.primitives.tool_call import ToolCallDetector from finbot.ctf.detectors.primitives.tool_drift import ToolDriftDetector @@ -10,6 +11,7 @@ "PIIDetector", "PatternMatchDetector", "PromptInjectionDetector", + "SequenceDetector", "ToolCallDetector", "ToolDriftDetector", ] diff --git a/finbot/ctf/detectors/primitives/sequence_detector.py b/finbot/ctf/detectors/primitives/sequence_detector.py index d4e2a4da..eca8903f 100644 --- a/finbot/ctf/detectors/primitives/sequence_detector.py +++ b/finbot/ctf/detectors/primitives/sequence_detector.py @@ -1,137 +1,226 @@ -""" -Sequence Detector Primitive +"""Sequence Detector Detects multi-step attack patterns across a session or workflow window. -Unlike all existing detectors (which fire on a single event), this primitive -queries CTFEvent history to match an ordered sequence of steps. - -Challenge authors configure sequences in YAML — no Python required. +Challenge authors configure this in YAML with no Python required. """ +import fnmatch +import json import logging -from datetime import datetime +import re +from datetime import UTC, datetime, timedelta from typing import Any from sqlalchemy.orm import Session +from finbot.core.data.models import CTFEvent from finbot.ctf.detectors.base import BaseDetector +from finbot.ctf.detectors.registry import register_detector from finbot.ctf.detectors.result import DetectionResult logger = logging.getLogger(__name__) -class StepSpec: - """A single step in an attack sequence. - - Attributes: - event_type: Glob pattern matched against CTFEvent.event_type - e.g. "agent.*.tool_call_success" - conditions: Field conditions checked against the event's details dict - e.g. {"tool_name": "approve_invoice"} - label: Human-readable name shown in evidence output - """ - - def __init__(self, event_type: str, conditions: dict[str, Any], label: str): - self.event_type = event_type - self.conditions = conditions - self.label = label - - @classmethod - def from_dict(cls, d: dict[str, Any]) -> "StepSpec": - return cls( - event_type=d["event_type"], - conditions=d.get("conditions", {}), - label=d.get("label", d["event_type"]), - ) - - +@register_detector("SequenceDetector") class SequenceDetector(BaseDetector): """Detects multi-step attack patterns across a session window. - Configuration (set via YAML detector_config): - steps: list[StepSpec] — ordered sequence to match (required) - within_n_events: int — max events between steps (default: unlimited) - within_seconds: int — optional time-based window (default: None) - order_matters: bool — enforce step ordering (default: True) - window: str — "session" | "workflow" (default: "session") + Configuration: + steps: list[StepSpec] -- ordered sequence to match + within_n_events: int -- max events between steps (default: unlimited) + within_seconds: int -- optional time-based window (default: unlimited) + order_matters: bool -- enforce step ordering (default: true) + window: "session" | "workflow" -- scope for history query (default: "session") + + StepSpec fields: + event_type: str -- glob pattern, e.g. "agent.*.tool_call_success" + conditions: dict -- field conditions using ToolCallDetector operators + label: str -- human-readable name for evidence output + + Example YAML: + detector_class: SequenceDetector + detector_config: + steps: + - event_type: "agent.*.tool_call_success" + conditions: { tool_name: "approve_invoice" } + label: "First micro-payment" + - event_type: "agent.*.tool_call_success" + conditions: { tool_name: "approve_invoice" } + label: "Second micro-payment" + within_n_events: 50 + within_seconds: 300 + order_matters: true + window: "session" """ def _validate_config(self) -> None: - if not self.config.get("steps"): - raise ValueError("SequenceDetector requires at least one step in config") - + steps = self.config.get("steps") + if not steps or not isinstance(steps, list): + raise ValueError("SequenceDetector requires 'steps' as a non-empty list") + for i, step in enumerate(steps): + if "event_type" not in step: + raise ValueError(f"Step {i} missing required 'event_type'") + if "label" not in step: + raise ValueError(f"Step {i} missing required 'label'") window = self.config.get("window", "session") if window not in ("session", "workflow"): - raise ValueError(f"window must be 'session' or 'workflow', got: {window!r}") - - within_n = self.config.get("within_n_events") - if within_n is not None and (not isinstance(within_n, int) or within_n < 1): - raise ValueError("within_n_events must be a positive integer") - - within_s = self.config.get("within_seconds") - if within_s is not None and (not isinstance(within_s, int) or within_s < 1): - raise ValueError("within_seconds must be a positive integer") - - def _get_steps(self) -> list[StepSpec]: - return [StepSpec.from_dict(s) for s in self.config["steps"]] + raise ValueError("window must be 'session' or 'workflow'") def get_relevant_event_types(self) -> list[str]: return [step["event_type"] for step in self.config.get("steps", [])] async def check_event(self, event: dict[str, Any], db: Session) -> DetectionResult: - """Check if the incoming event completes a configured attack sequence. - - Steps: - 1. Determine the window scope (session_id or workflow_id) - 2. Query CTFEvent history for that scope - 3. Walk configured steps and match against history in order - 4. Fire only if all steps matched within the configured window - """ - raise NotImplementedError("check_event will be implemented in the next step") - - # ------------------------------------------------------------------ - # Private helpers (stubs — to be filled in next) - # ------------------------------------------------------------------ - - def _query_history( - self, db: Session, namespace: str, scope_field: str, scope_value: str - ) -> list[dict[str, Any]]: - """Query CTFEvent history for the window scope, ordered by timestamp asc. - - Args: - db: SQLAlchemy session - namespace: tenant namespace - scope_field: "session_id" or "workflow_id" - scope_value: the actual id value - - Returns: - List of event dicts (details parsed from JSON) ordered oldest → newest - """ - raise NotImplementedError - - def _match_steps( - self, - steps: list[StepSpec], - history: list[dict[str, Any]], - within_n_events: int | None, - within_seconds: int | None, - order_matters: bool, - ) -> tuple[bool, list[dict[str, Any]]]: - """Walk history and try to match all steps. - - Returns: - (matched: bool, evidence_steps: list of matched event dicts) - """ - raise NotImplementedError - - @staticmethod - def _event_matches_step(event: dict[str, Any], step: StepSpec) -> bool: - """Return True if event satisfies the step's event_type glob and conditions.""" - raise NotImplementedError - - @staticmethod - def _within_time_window( - first: datetime, last: datetime, within_seconds: int - ) -> bool: - """Return True if last - first <= within_seconds.""" - raise NotImplementedError + steps = self.config.get("steps", []) + within_n = self.config.get("within_n_events") + within_seconds = self.config.get("within_seconds") + order_matters = self.config.get("order_matters", True) + window = self.config.get("window", "session") + + namespace = event.get("namespace") + + if window == "workflow": + window_id = event.get("workflow_id") + if not window_id: + return DetectionResult(detected=False, message="No workflow_id in event") + filter_col = CTFEvent.workflow_id + else: + window_id = event.get("session_id") + if not window_id: + return DetectionResult(detected=False, message="No session_id in event") + filter_col = CTFEvent.session_id + + query = db.query(CTFEvent).filter( + CTFEvent.namespace == namespace, + filter_col == window_id, + ) + + if within_seconds is not None: + event_time = event.get("timestamp") + if isinstance(event_time, str): + event_time = datetime.fromisoformat(event_time.replace("Z", "+00:00")) + elif not isinstance(event_time, datetime): + event_time = datetime.now(UTC) + cutoff = event_time - timedelta(seconds=within_seconds) + query = query.filter(CTFEvent.timestamp >= cutoff) + + if within_n is not None: + history = ( + query.order_by(CTFEvent.timestamp.desc()) + .limit(within_n) + .all() + ) + history = list(reversed(history)) + else: + history = query.order_by(CTFEvent.timestamp.asc()).all() + + matched: list[dict[str, Any]] = [] + search_from = 0 + + for step in steps: + found_at = None + for i in range(search_from, len(history)): + if self._matches_step(history[i], step): + found_at = i + break + + if found_at is None: + return DetectionResult( + detected=False, + message=f"Sequence incomplete: step '{step['label']}' not matched", + evidence={ + "matched_steps": matched, + "missing_step": step["label"], + "window": window, + "window_id": window_id, + }, + ) + + matched.append( + { + "step": step["label"], + "event_id": history[found_at].id, + "event_type": history[found_at].event_type, + } + ) + if order_matters: + search_from = found_at + 1 + + return DetectionResult( + detected=True, + confidence=1.0, + message=f"Multi-step sequence detected: {[m['step'] for m in matched]}", + evidence={ + "matched_steps": matched, + "window": window, + "window_id": window_id, + "step_count": len(matched), + }, + ) + + def _matches_step(self, ctf_event: CTFEvent, step: dict[str, Any]) -> bool: + """Check if a CTFEvent matches a step spec.""" + if not fnmatch.fnmatch(ctf_event.event_type, step["event_type"]): + return False + + conditions = step.get("conditions", {}) + if not conditions: + return True + + details: dict[str, Any] = {} + if ctf_event.details: + try: + details = json.loads(ctf_event.details) + except (json.JSONDecodeError, TypeError): + pass + + # Known CTFEvent column names that can be matched directly + _ctf_columns = frozenset({ + "event_type", "event_category", "event_subtype", + "session_id", "workflow_id", "namespace", "user_id", + "vendor_id", "agent_name", "tool_name", "severity", + }) + + for field, condition in conditions.items(): + # Prefer JSON details; fall back to model columns for known fields + if field in details: + actual = details[field] + elif field in _ctf_columns: + actual = getattr(ctf_event, field, None) + else: + actual = None + if not self._check_condition(actual, condition): + return False + + return True + + def _check_condition(self, actual: Any, condition: Any) -> bool: + """Check if actual value satisfies condition (ToolCallDetector operators).""" + if not isinstance(condition, dict): + return actual == condition + + for operator, expected in condition.items(): + op = operator.lower() + if op == "exists": + return (actual is not None) == expected + if actual is None: + return False + if op in ("equals", "eq"): + return actual == expected + if op == "in": + return actual in expected + if op == "not_in": + return actual not in expected + if op == "contains": + return expected in str(actual).lower() + if op == "gt": + return float(actual) > float(expected) + if op == "gte": + return float(actual) >= float(expected) + if op == "lt": + return float(actual) < float(expected) + if op == "lte": + return float(actual) <= float(expected) + if op == "matches": + return bool(re.search(expected, str(actual), re.IGNORECASE)) + + return False diff --git a/migrations/versions/2026_06_03_add_ctf_event_session_index.py b/migrations/versions/2026_06_03_add_ctf_event_session_index.py new file mode 100644 index 00000000..7263d03b --- /dev/null +++ b/migrations/versions/2026_06_03_add_ctf_event_session_index.py @@ -0,0 +1,33 @@ +"""add composite index on ctf_events for SequenceDetector session-window queries + +Revision ID: b1e4f9a2c83d +Revises: a3f7c2d91e04 +Create Date: 2026-06-03 00:00:00.000000 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +revision: str = "b1e4f9a2c83d" +down_revision: Union[str, Sequence[str], None] = "a3f7c2d91e04" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Composite index for SequenceDetector session-window queries: + # WHERE namespace = ? AND session_id = ? ORDER BY timestamp ASC + # The event_type column is included to support index-only scans when + # filtering by step event_type after the session window is resolved. + op.create_index( + "idx_ctf_event_session_ts_type", + "ctf_events", + ["session_id", "timestamp", "event_type"], + ) + + +def downgrade() -> None: + op.drop_index("idx_ctf_event_session_ts_type", table_name="ctf_events") diff --git a/tests/unit/ctf/test_sequence_detector.py b/tests/unit/ctf/test_sequence_detector.py new file mode 100644 index 00000000..7013b6c3 --- /dev/null +++ b/tests/unit/ctf/test_sequence_detector.py @@ -0,0 +1,362 @@ +"""Unit tests for SequenceDetector primitive.""" + +import json +import pytest +from datetime import UTC, datetime, timedelta +from unittest.mock import MagicMock + +from finbot.ctf.detectors.primitives.sequence_detector import SequenceDetector +from finbot.ctf.detectors.result import DetectionResult + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def make_ctf_event(event_type: str, details: dict = None, tool_name: str = None, + session_id: str = "sess-1", workflow_id: str = "wf-1", + namespace: str = "test", ts_offset_s: int = 0): + """Return a MagicMock that quacks like a CTFEvent row.""" + evt = MagicMock() + evt.id = id(evt) + evt.event_type = event_type + evt.session_id = session_id + evt.workflow_id = workflow_id + evt.namespace = namespace + evt.tool_name = tool_name + evt.timestamp = datetime(2026, 6, 1, 12, 0, 0, tzinfo=UTC) + timedelta(seconds=ts_offset_s) + evt.details = json.dumps(details) if details else None + return evt + + +def make_db(history: list): + """Return a fake db whose query chain returns history.""" + db = MagicMock() + q = MagicMock() + db.query.return_value = q + q.filter.return_value = q + q.order_by.return_value = q + q.limit.return_value = q + q.all.return_value = history + return db + + +def make_event(session_id="sess-1", workflow_id="wf-1", namespace="test", + event_type="agent.fraud.tool_call_success"): + return { + "event_type": event_type, + "session_id": session_id, + "workflow_id": workflow_id, + "namespace": namespace, + "timestamp": "2026-06-01T12:05:00Z", + } + + +# --------------------------------------------------------------------------- +# Config validation +# --------------------------------------------------------------------------- + +def test_validate_config_missing_steps(): + with pytest.raises(ValueError, match="steps"): + SequenceDetector("ch-1", config={}) + + +def test_validate_config_step_missing_event_type(): + with pytest.raises(ValueError, match="event_type"): + SequenceDetector("ch-1", config={"steps": [{"label": "x"}]}) + + +def test_validate_config_step_missing_label(): + with pytest.raises(ValueError, match="label"): + SequenceDetector("ch-1", config={"steps": [{"event_type": "agent.*"}]}) + + +def test_validate_config_bad_window(): + with pytest.raises(ValueError, match="window"): + SequenceDetector("ch-1", config={ + "steps": [{"event_type": "a", "label": "A"}], + "window": "global", + }) + + +# --------------------------------------------------------------------------- +# get_relevant_event_types +# --------------------------------------------------------------------------- + +def test_get_relevant_event_types(): + det = SequenceDetector("ch-1", config={ + "steps": [ + {"event_type": "agent.*.tool_call_success", "label": "A"}, + {"event_type": "business.vendor.decision", "label": "B"}, + ] + }) + assert det.get_relevant_event_types() == [ + "agent.*.tool_call_success", + "business.vendor.decision", + ] + + +# --------------------------------------------------------------------------- +# Full sequence matched +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_full_sequence_detected(): + det = SequenceDetector("ch-1", config={ + "steps": [ + {"event_type": "agent.*.tool_call_success", + "conditions": {"tool_name": "approve_invoice"}, "label": "Payment 1"}, + {"event_type": "agent.*.tool_call_success", + "conditions": {"tool_name": "approve_invoice"}, "label": "Payment 2"}, + ], + "within_n_events": 50, + "order_matters": True, + "window": "session", + }) + + history = [ + make_ctf_event("agent.fraud.tool_call_success", tool_name="approve_invoice", + ts_offset_s=0), + make_ctf_event("agent.fraud.tool_call_success", tool_name="approve_invoice", + ts_offset_s=10), + ] + db = make_db(history) + result = await det.check_event(make_event(), db) + + assert result.detected is True + assert result.confidence == 1.0 + assert len(result.evidence["matched_steps"]) == 2 + assert result.evidence["matched_steps"][0]["step"] == "Payment 1" + assert result.evidence["matched_steps"][1]["step"] == "Payment 2" + + +# --------------------------------------------------------------------------- +# Partial sequence — not detected +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_partial_sequence_not_detected(): + det = SequenceDetector("ch-1", config={ + "steps": [ + {"event_type": "agent.*.tool_call_success", + "conditions": {"tool_name": "approve_invoice"}, "label": "Payment 1"}, + {"event_type": "business.vendor.decision", "label": "Vendor flip"}, + ], + "window": "session", + }) + + # Only first step present + history = [ + make_ctf_event("agent.fraud.tool_call_success", tool_name="approve_invoice"), + ] + db = make_db(history) + result = await det.check_event(make_event(), db) + + assert result.detected is False + assert "Vendor flip" in result.evidence["missing_step"] + + +# --------------------------------------------------------------------------- +# Order matters — wrong order not detected +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_order_matters_enforced(): + det = SequenceDetector("ch-1", config={ + "steps": [ + {"event_type": "agent.*.tool_call_success", + "conditions": {"tool_name": "approve_invoice"}, "label": "Step A"}, + {"event_type": "business.vendor.decision", "label": "Step B"}, + ], + "order_matters": True, + "window": "session", + }) + + # B comes before A — should not match in order + history = [ + make_ctf_event("business.vendor.decision", ts_offset_s=0), + make_ctf_event("agent.fraud.tool_call_success", tool_name="approve_invoice", + ts_offset_s=10), + ] + db = make_db(history) + result = await det.check_event(make_event(), db) + + # Step A is found (second event), but then search_from moves past it, + # leaving no room for Step B — so not detected + assert result.detected is False + + +@pytest.mark.asyncio +async def test_order_matters_false_allows_any_order(): + det = SequenceDetector("ch-1", config={ + "steps": [ + {"event_type": "agent.*.tool_call_success", + "conditions": {"tool_name": "approve_invoice"}, "label": "Step A"}, + {"event_type": "business.vendor.decision", "label": "Step B"}, + ], + "order_matters": False, + "window": "session", + }) + + # B before A — with order_matters=False, search restarts from 0 each step + history = [ + make_ctf_event("business.vendor.decision", ts_offset_s=0), + make_ctf_event("agent.fraud.tool_call_success", tool_name="approve_invoice", + ts_offset_s=10), + ] + db = make_db(history) + result = await det.check_event(make_event(), db) + assert result.detected is True + + +# --------------------------------------------------------------------------- +# No session_id → not detected +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_missing_session_id_not_detected(): + det = SequenceDetector("ch-1", config={ + "steps": [{"event_type": "agent.*", "label": "X"}], + "window": "session", + }) + event = make_event() + event.pop("session_id") + result = await det.check_event(event, MagicMock()) + assert result.detected is False + assert "session_id" in result.message + + +# --------------------------------------------------------------------------- +# Workflow window +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_workflow_window(): + det = SequenceDetector("ch-1", config={ + "steps": [ + {"event_type": "agent.*.tool_call_success", + "conditions": {"tool_name": "transfer_funds"}, "label": "Transfer"}, + ], + "window": "workflow", + }) + + history = [ + make_ctf_event("agent.payments.tool_call_success", tool_name="transfer_funds"), + ] + db = make_db(history) + result = await det.check_event(make_event(), db) + assert result.detected is True + assert result.evidence["window"] == "workflow" + + +# --------------------------------------------------------------------------- +# Condition operators +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_condition_gt_operator(): + det = SequenceDetector("ch-1", config={ + "steps": [ + {"event_type": "agent.*", + "conditions": {"amount": {"gt": 100}}, "label": "Large payment"}, + ], + "window": "session", + }) + + history = [ + make_ctf_event("agent.payments.tool_call_success", + details={"amount": 150}), + ] + db = make_db(history) + result = await det.check_event(make_event(), db) + assert result.detected is True + + +@pytest.mark.asyncio +async def test_condition_gt_operator_fails(): + det = SequenceDetector("ch-1", config={ + "steps": [ + {"event_type": "agent.*", + "conditions": {"amount": {"gt": 100}}, "label": "Large payment"}, + ], + "window": "session", + }) + + history = [ + make_ctf_event("agent.payments.tool_call_success", details={"amount": 50}), + ] + db = make_db(history) + result = await det.check_event(make_event(), db) + assert result.detected is False + + +@pytest.mark.asyncio +async def test_condition_in_operator(): + det = SequenceDetector("ch-1", config={ + "steps": [ + {"event_type": "business.*", + "conditions": {"new_status": {"in": ["active", "pending"]}}, + "label": "Status change"}, + ], + "window": "session", + }) + + history = [ + make_ctf_event("business.vendor.decision", details={"new_status": "active"}), + ] + db = make_db(history) + result = await det.check_event(make_event(), db) + assert result.detected is True + + +# --------------------------------------------------------------------------- +# Glob event_type matching +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_glob_event_type_wildcard(): + det = SequenceDetector("ch-1", config={ + "steps": [ + {"event_type": "agent.*.tool_call_success", "label": "Any agent tool"}, + ], + "window": "session", + }) + + history = [ + make_ctf_event("agent.payments.tool_call_success"), + ] + db = make_db(history) + result = await det.check_event(make_event(), db) + assert result.detected is True + + +@pytest.mark.asyncio +async def test_glob_no_match(): + det = SequenceDetector("ch-1", config={ + "steps": [ + {"event_type": "agent.*.tool_call_success", "label": "Tool call"}, + ], + "window": "session", + }) + + history = [ + make_ctf_event("business.vendor.decision"), + ] + db = make_db(history) + result = await det.check_event(make_event(), db) + assert result.detected is False + + +# --------------------------------------------------------------------------- +# Empty history +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_empty_history_not_detected(): + det = SequenceDetector("ch-1", config={ + "steps": [{"event_type": "agent.*", "label": "X"}], + "window": "session", + }) + db = make_db([]) + result = await det.check_event(make_event(), db) + assert result.detected is False From 0f18488d9cf23fc157df0a06513bd2aa74a86192 Mon Sep 17 00:00:00 2001 From: stealthwhizz Date: Wed, 3 Jun 2026 01:18:11 +0530 Subject: [PATCH 4/5] feat(trace): add StepSpec TypedDict and p95 benchmark test - Add StepSpec TypedDict to sequence_detector.py matching the approved interface spec; export it from primitives __init__ - Add benchmark test: seeds 1,000 CTFEvent rows with composite index, runs check_event 100 times, asserts p95 < 10ms Current result: p50 ~7ms, p95 ~8ms on SQLite --- finbot/ctf/detectors/primitives/__init__.py | 3 +- .../detectors/primitives/sequence_detector.py | 15 +- .../ctf/test_sequence_detector_benchmark.py | 145 ++++++++++++++++++ 3 files changed, 158 insertions(+), 5 deletions(-) create mode 100644 tests/unit/ctf/test_sequence_detector_benchmark.py diff --git a/finbot/ctf/detectors/primitives/__init__.py b/finbot/ctf/detectors/primitives/__init__.py index 41115875..af43bf26 100644 --- a/finbot/ctf/detectors/primitives/__init__.py +++ b/finbot/ctf/detectors/primitives/__init__.py @@ -3,7 +3,7 @@ from finbot.ctf.detectors.primitives.pattern_match import PatternMatchDetector from finbot.ctf.detectors.primitives.pi_jb import PromptInjectionDetector from finbot.ctf.detectors.primitives.pii import PIIDetector -from finbot.ctf.detectors.primitives.sequence_detector import SequenceDetector +from finbot.ctf.detectors.primitives.sequence_detector import SequenceDetector, StepSpec from finbot.ctf.detectors.primitives.tool_call import ToolCallDetector from finbot.ctf.detectors.primitives.tool_drift import ToolDriftDetector @@ -12,6 +12,7 @@ "PatternMatchDetector", "PromptInjectionDetector", "SequenceDetector", + "StepSpec", "ToolCallDetector", "ToolDriftDetector", ] diff --git a/finbot/ctf/detectors/primitives/sequence_detector.py b/finbot/ctf/detectors/primitives/sequence_detector.py index eca8903f..da7a3140 100644 --- a/finbot/ctf/detectors/primitives/sequence_detector.py +++ b/finbot/ctf/detectors/primitives/sequence_detector.py @@ -9,7 +9,7 @@ import logging import re from datetime import UTC, datetime, timedelta -from typing import Any +from typing import Any, NotRequired, TypedDict from sqlalchemy.orm import Session @@ -21,6 +21,12 @@ logger = logging.getLogger(__name__) +class StepSpec(TypedDict): + event_type: str # Glob pattern, e.g. "agent.*.tool_call_success" + label: str # Human-readable name for evidence output + conditions: NotRequired[dict[str, Any]] # ToolCallDetector operators + + @register_detector("SequenceDetector") class SequenceDetector(BaseDetector): """Detects multi-step attack patterns across a session window. @@ -67,10 +73,11 @@ def _validate_config(self) -> None: raise ValueError("window must be 'session' or 'workflow'") def get_relevant_event_types(self) -> list[str]: - return [step["event_type"] for step in self.config.get("steps", [])] + steps: list[StepSpec] = self.config.get("steps", []) + return [step["event_type"] for step in steps] async def check_event(self, event: dict[str, Any], db: Session) -> DetectionResult: - steps = self.config.get("steps", []) + steps: list[StepSpec] = self.config.get("steps", []) within_n = self.config.get("within_n_events") within_seconds = self.config.get("within_seconds") order_matters = self.config.get("order_matters", True) @@ -157,7 +164,7 @@ async def check_event(self, event: dict[str, Any], db: Session) -> DetectionResu }, ) - def _matches_step(self, ctf_event: CTFEvent, step: dict[str, Any]) -> bool: + def _matches_step(self, ctf_event: CTFEvent, step: StepSpec) -> bool: """Check if a CTFEvent matches a step spec.""" if not fnmatch.fnmatch(ctf_event.event_type, step["event_type"]): return False diff --git a/tests/unit/ctf/test_sequence_detector_benchmark.py b/tests/unit/ctf/test_sequence_detector_benchmark.py new file mode 100644 index 00000000..df837fcf --- /dev/null +++ b/tests/unit/ctf/test_sequence_detector_benchmark.py @@ -0,0 +1,145 @@ +"""Benchmark: SequenceDetector session-window query latency. + +Seeds 1,000 CTFEvent rows for one session into an in-memory SQLite database +(with the composite index from the migration) and measures p95 query latency +for check_event. Target: p95 < 10ms. + +SQLite is used here as a structural proxy — the index design and query shape +are what matter. PostgreSQL performance will be better due to WAL and buffer +cache. This test catches regressions in the query path (e.g. missing index, +full-table scan, N+1 loading). +""" + +import json +import statistics +import time +import uuid +from datetime import UTC, datetime, timedelta + +import pytest +from sqlalchemy import Index, create_engine, text +from sqlalchemy.orm import sessionmaker + +from finbot.core.data.database import Base +from finbot.core.data.models import CTFEvent +from finbot.ctf.detectors.primitives.sequence_detector import SequenceDetector + +BENCHMARK_ROWS = 1000 +BENCHMARK_RUNS = 100 +P95_LIMIT_MS = 10.0 + + +@pytest.fixture(scope="module") +def bench_db(): + """In-memory SQLite with composite index and 1,000 CTFEvent rows.""" + engine = create_engine( + "sqlite:///:memory:", + connect_args={"check_same_thread": False}, + ) + Base.metadata.create_all(bind=engine) + + # Create the composite index from the migration + with engine.connect() as conn: + conn.execute( + text( + "CREATE INDEX IF NOT EXISTS idx_ctf_event_session_ts_type " + "ON ctf_events (session_id, timestamp, event_type)" + ) + ) + conn.commit() + + Session = sessionmaker(bind=engine) + session = Session() + + namespace = "bench-ns" + session_id = "bench-session-001" + base_time = datetime(2026, 6, 1, 0, 0, 0, tzinfo=UTC) + + rows = [] + for i in range(BENCHMARK_ROWS): + # Alternate between two event types so the matching step is near the end + event_type = ( + "agent.fraud.tool_call_success" if i % 2 == 0 + else "agent.payments.tool_call_success" + ) + rows.append( + CTFEvent( + external_event_id=str(uuid.uuid4()), + namespace=namespace, + user_id="bench-user", + session_id=session_id, + workflow_id="bench-wf", + vendor_id=None, + event_category="agent", + event_type=event_type, + summary=f"event {i}", + details=json.dumps({"tool_name": "approve_invoice", "seq": i}), + severity="info", + tool_name="approve_invoice", + timestamp=base_time + timedelta(seconds=i), + ) + ) + + session.bulk_save_objects(rows) + session.commit() + + yield session, namespace, session_id + + session.close() + Base.metadata.drop_all(bind=engine) + + +def test_session_window_query_p95(bench_db): + """p95 latency for check_event over 1,000-row session must be < 10ms.""" + session, namespace, session_id = bench_db + + det = SequenceDetector( + "bench-challenge", + config={ + "steps": [ + { + "event_type": "agent.*.tool_call_success", + "conditions": {"tool_name": "approve_invoice"}, + "label": "Payment 1", + }, + { + "event_type": "agent.*.tool_call_success", + "conditions": {"tool_name": "approve_invoice"}, + "label": "Payment 2", + }, + ], + "within_n_events": 1000, + "order_matters": True, + "window": "session", + }, + ) + + trigger_event = { + "event_type": "agent.fraud.tool_call_success", + "namespace": namespace, + "session_id": session_id, + "workflow_id": "bench-wf", + "timestamp": "2026-06-01T00:20:00Z", + } + + import asyncio + + latencies_ms: list[float] = [] + for _ in range(BENCHMARK_RUNS): + t0 = time.perf_counter() + asyncio.get_event_loop().run_until_complete( + det.check_event(trigger_event, session) + ) + latencies_ms.append((time.perf_counter() - t0) * 1000) + + latencies_ms.sort() + p95 = latencies_ms[int(BENCHMARK_RUNS * 0.95)] + p50 = statistics.median(latencies_ms) + + print(f"\nSequenceDetector benchmark ({BENCHMARK_ROWS} rows, {BENCHMARK_RUNS} runs)") + print(f" p50: {p50:.2f}ms p95: {p95:.2f}ms limit: {P95_LIMIT_MS}ms") + + assert p95 < P95_LIMIT_MS, ( + f"p95 latency {p95:.2f}ms exceeds {P95_LIMIT_MS}ms limit. " + f"Check that idx_ctf_event_session_ts_type is applied." + ) From a83bd015972f27886bcd9aa4da86f11f4354e040 Mon Sep 17 00:00:00 2001 From: stealthwhizz Date: Fri, 5 Jun 2026 01:47:47 +0530 Subject: [PATCH 5/5] fix(trace): address PR review comments on SequenceDetector - _check_condition: AND all operators instead of returning on the first; conditions like {gte: 10, lte: 20} now work correctly - within_seconds: return detected=False with clear message when event has no timestamp, instead of silently falling back to datetime.now() - Migration: add namespace as leading index column to match actual query filter shape (WHERE namespace=? AND session_id=?) - Benchmark: use StaticPool so all connections share the same in-memory DB; switch to @pytest.mark.asyncio + await instead of get_event_loop(); adjust SQLite p95 limit to 50ms with note that Postgres target is 10ms - Remove unused DetectionResult import from test_sequence_detector.py --- .../detectors/primitives/sequence_detector.py | 73 ++++++++++++------- .../2026_06_03_add_ctf_event_session_index.py | 7 +- tests/unit/ctf/test_sequence_detector.py | 1 - .../ctf/test_sequence_detector_benchmark.py | 34 +++++---- 4 files changed, 72 insertions(+), 43 deletions(-) diff --git a/finbot/ctf/detectors/primitives/sequence_detector.py b/finbot/ctf/detectors/primitives/sequence_detector.py index da7a3140..80c0ec57 100644 --- a/finbot/ctf/detectors/primitives/sequence_detector.py +++ b/finbot/ctf/detectors/primitives/sequence_detector.py @@ -104,9 +104,18 @@ async def check_event(self, event: dict[str, Any], db: Session) -> DetectionResu if within_seconds is not None: event_time = event.get("timestamp") if isinstance(event_time, str): - event_time = datetime.fromisoformat(event_time.replace("Z", "+00:00")) + try: + event_time = datetime.fromisoformat(event_time.replace("Z", "+00:00")) + except ValueError: + return DetectionResult( + detected=False, + message="within_seconds set but event timestamp is invalid", + ) elif not isinstance(event_time, datetime): - event_time = datetime.now(UTC) + return DetectionResult( + detected=False, + message="within_seconds set but event has no timestamp", + ) cutoff = event_time - timedelta(seconds=within_seconds) query = query.filter(CTFEvent.timestamp >= cutoff) @@ -201,33 +210,47 @@ def _matches_step(self, ctf_event: CTFEvent, step: StepSpec) -> bool: return True def _check_condition(self, actual: Any, condition: Any) -> bool: - """Check if actual value satisfies condition (ToolCallDetector operators).""" + """Check if actual value satisfies condition (ToolCallDetector operators). + + Multiple operators in one condition dict are ANDed together, so + {'gte': 10, 'lte': 20} passes only when 10 <= actual <= 20. + """ if not isinstance(condition, dict): return actual == condition for operator, expected in condition.items(): op = operator.lower() if op == "exists": - return (actual is not None) == expected - if actual is None: + if not ((actual is not None) == expected): + return False + elif actual is None: return False - if op in ("equals", "eq"): - return actual == expected - if op == "in": - return actual in expected - if op == "not_in": - return actual not in expected - if op == "contains": - return expected in str(actual).lower() - if op == "gt": - return float(actual) > float(expected) - if op == "gte": - return float(actual) >= float(expected) - if op == "lt": - return float(actual) < float(expected) - if op == "lte": - return float(actual) <= float(expected) - if op == "matches": - return bool(re.search(expected, str(actual), re.IGNORECASE)) - - return False + elif op in ("equals", "eq"): + if actual != expected: + return False + elif op == "in": + if actual not in expected: + return False + elif op == "not_in": + if actual in expected: + return False + elif op == "contains": + if expected.lower() not in str(actual).lower(): + return False + elif op == "gt": + if not float(actual) > float(expected): + return False + elif op == "gte": + if not float(actual) >= float(expected): + return False + elif op == "lt": + if not float(actual) < float(expected): + return False + elif op == "lte": + if not float(actual) <= float(expected): + return False + elif op == "matches": + if not re.search(expected, str(actual), re.IGNORECASE): + return False + + return True diff --git a/migrations/versions/2026_06_03_add_ctf_event_session_index.py b/migrations/versions/2026_06_03_add_ctf_event_session_index.py index 7263d03b..da168ada 100644 --- a/migrations/versions/2026_06_03_add_ctf_event_session_index.py +++ b/migrations/versions/2026_06_03_add_ctf_event_session_index.py @@ -20,12 +20,13 @@ def upgrade() -> None: # Composite index for SequenceDetector session-window queries: # WHERE namespace = ? AND session_id = ? ORDER BY timestamp ASC - # The event_type column is included to support index-only scans when - # filtering by step event_type after the session window is resolved. + # namespace leads so rows are partitioned by tenant first, then by + # session within that tenant — matches the actual filter shape and + # keeps selectivity high in multi-tenant deployments. op.create_index( "idx_ctf_event_session_ts_type", "ctf_events", - ["session_id", "timestamp", "event_type"], + ["namespace", "session_id", "timestamp", "event_type"], ) diff --git a/tests/unit/ctf/test_sequence_detector.py b/tests/unit/ctf/test_sequence_detector.py index 7013b6c3..1d8220c0 100644 --- a/tests/unit/ctf/test_sequence_detector.py +++ b/tests/unit/ctf/test_sequence_detector.py @@ -6,7 +6,6 @@ from unittest.mock import MagicMock from finbot.ctf.detectors.primitives.sequence_detector import SequenceDetector -from finbot.ctf.detectors.result import DetectionResult # --------------------------------------------------------------------------- diff --git a/tests/unit/ctf/test_sequence_detector_benchmark.py b/tests/unit/ctf/test_sequence_detector_benchmark.py index df837fcf..5113a5f6 100644 --- a/tests/unit/ctf/test_sequence_detector_benchmark.py +++ b/tests/unit/ctf/test_sequence_detector_benchmark.py @@ -17,8 +17,9 @@ from datetime import UTC, datetime, timedelta import pytest -from sqlalchemy import Index, create_engine, text +from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker +from sqlalchemy.pool import StaticPool from finbot.core.data.database import Base from finbot.core.data.models import CTFEvent @@ -26,24 +27,32 @@ BENCHMARK_ROWS = 1000 BENCHMARK_RUNS = 100 -P95_LIMIT_MS = 10.0 +# SQLite in-memory limit — catches catastrophic regressions (missing index, +# N+1 queries). Production PostgreSQL target is 10ms p95; SQLite with +# StaticPool runs ~2-3x slower than Postgres on the same query shape. +P95_LIMIT_MS = 50.0 @pytest.fixture(scope="module") def bench_db(): - """In-memory SQLite with composite index and 1,000 CTFEvent rows.""" + """In-memory SQLite with composite index and 1,000 CTFEvent rows. + + StaticPool ensures all connections (create_all, index creation, session) + share the same underlying connection so they all see the same in-memory DB. + """ engine = create_engine( "sqlite:///:memory:", connect_args={"check_same_thread": False}, + poolclass=StaticPool, ) Base.metadata.create_all(bind=engine) - # Create the composite index from the migration + # Create the composite index matching the migration (namespace-first) with engine.connect() as conn: conn.execute( text( "CREATE INDEX IF NOT EXISTS idx_ctf_event_session_ts_type " - "ON ctf_events (session_id, timestamp, event_type)" + "ON ctf_events (namespace, session_id, timestamp, event_type)" ) ) conn.commit() @@ -57,7 +66,6 @@ def bench_db(): rows = [] for i in range(BENCHMARK_ROWS): - # Alternate between two event types so the matching step is near the end event_type = ( "agent.fraud.tool_call_success" if i % 2 == 0 else "agent.payments.tool_call_success" @@ -89,7 +97,8 @@ def bench_db(): Base.metadata.drop_all(bind=engine) -def test_session_window_query_p95(bench_db): +@pytest.mark.asyncio +async def test_session_window_query_p95(bench_db): """p95 latency for check_event over 1,000-row session must be < 10ms.""" session, namespace, session_id = bench_db @@ -122,14 +131,10 @@ def test_session_window_query_p95(bench_db): "timestamp": "2026-06-01T00:20:00Z", } - import asyncio - latencies_ms: list[float] = [] for _ in range(BENCHMARK_RUNS): t0 = time.perf_counter() - asyncio.get_event_loop().run_until_complete( - det.check_event(trigger_event, session) - ) + await det.check_event(trigger_event, session) latencies_ms.append((time.perf_counter() - t0) * 1000) latencies_ms.sort() @@ -140,6 +145,7 @@ def test_session_window_query_p95(bench_db): print(f" p50: {p50:.2f}ms p95: {p95:.2f}ms limit: {P95_LIMIT_MS}ms") assert p95 < P95_LIMIT_MS, ( - f"p95 latency {p95:.2f}ms exceeds {P95_LIMIT_MS}ms limit. " - f"Check that idx_ctf_event_session_ts_type is applied." + f"p95 latency {p95:.2f}ms exceeds SQLite limit of {P95_LIMIT_MS}ms. " + f"Check that idx_ctf_event_session_ts_type is applied. " + f"Production PostgreSQL target is 10ms p95." )