-
Notifications
You must be signed in to change notification settings - Fork 60
feat: add context-aware adaptive risk scoring #157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
varshini-nandula
wants to merge
2
commits into
Devnil434:main
Choose a base branch
from
varshini-nandula:feat/context-aware-risk-scoring
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| # Context-Aware Adaptive Risk Scoring Policy | ||
| # ============================================ | ||
| # Weights control how much each contextual signal contributes to the | ||
| # final risk score. They must sum to 1.0 for correct normalisation. | ||
| # | ||
| # To customise scoring behaviour, adjust the weight values below. | ||
| # The scorer will raise an error on startup if weights do not sum to 1.0. | ||
|
|
||
| risk_scoring: | ||
| restricted_zone: | ||
| weight: 0.30 | ||
| description: "Subject is inside or entered a restricted zone" | ||
|
|
||
| repeated_approach: | ||
| weight: 0.25 | ||
| description: "Subject has approached the same zone multiple times" | ||
|
|
||
| loitering: | ||
| weight: 0.15 | ||
| description: "Subject has been dwelling/lingering beyond threshold" | ||
|
|
||
| after_hours: | ||
| weight: 0.20 | ||
| description: "Activity detected outside normal operating hours" | ||
|
|
||
| reasoning_confidence: | ||
| weight: 0.10 | ||
| description: "LLM reasoning confidence as a risk amplifier" | ||
|
|
||
| # Thresholds for risk level classification (applied to 0-100 scale) | ||
| risk_levels: | ||
| low_max: 40 | ||
| medium_max: 70 | ||
|
|
||
| # Normalization parameters for continuous signals | ||
| normalization: | ||
| loitering_max_seconds: 120.0 | ||
| repeated_approach_max_count: 5 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,229 @@ | ||
| """ | ||
| Adaptive Risk Scoring Engine for Eagle Surveillance. | ||
|
|
||
| Loads configurable weights from a YAML policy file and computes a normalised | ||
| 0β100 risk score from multiple contextual signals. Designed as a drop-in | ||
| replacement for the hardcoded severity weights previously defined in | ||
| ``services.reasoning.pipeline._W``. | ||
|
|
||
| Usage | ||
| ----- | ||
| from services.reasoning.risk_scoring import AdaptiveRiskScorer | ||
|
|
||
| scorer = AdaptiveRiskScorer("configs/risk_policy.yaml") | ||
| result = scorer.score({ | ||
| "restricted_zone": True, | ||
| "repeated_approach": 3, | ||
| "loitering": 45.0, | ||
| "after_hours": False, | ||
| "reasoning_confidence": 0.85, | ||
| }) | ||
| # result == {"risk_score": 62, "risk_level": "Medium", "risk_factors": [...]} | ||
| """ | ||
| from __future__ import annotations | ||
|
|
||
| import logging | ||
| from pathlib import Path | ||
| from typing import Any, TypedDict | ||
|
|
||
| import yaml | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| # Default policy path relative to the project root | ||
| _DEFAULT_POLICY_PATH = Path(__file__).resolve().parents[2] / "configs" / "risk_policy.yaml" | ||
|
|
||
| # Human-readable descriptions for each signal | ||
| _FACTOR_LABELS: dict[str, str] = { | ||
| "restricted_zone": "Restricted zone detected", | ||
| "repeated_approach": "Repeated approach behavior", | ||
| "loitering": "Loitering / extended dwell time", | ||
| "after_hours": "After-hours activity", | ||
| "reasoning_confidence": "High reasoning confidence", | ||
| } | ||
|
|
||
|
|
||
| class RiskScoringResult(TypedDict): | ||
| """Typed result returned by :meth:`AdaptiveRiskScorer.score`.""" | ||
|
|
||
| risk_score: int | ||
| risk_level: str | ||
| risk_factors: list[str] | ||
|
|
||
|
|
||
| class AdaptiveRiskScorer: | ||
| """Context-aware risk scoring engine. | ||
|
|
||
| Loads configurable weights from a YAML policy file and computes a | ||
| normalised 0β100 risk score from multiple contextual signals. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| policy_path: | ||
| Path to the YAML policy file. If ``None``, uses the default | ||
| ``configs/risk_policy.yaml`` relative to the project root. | ||
|
|
||
| Raises | ||
| ------ | ||
| FileNotFoundError | ||
| If the policy YAML file does not exist. | ||
| ValueError | ||
| If the YAML file is malformed or has an invalid structure. | ||
| """ | ||
|
|
||
| def __init__(self, policy_path: str | Path | None = None) -> None: | ||
| resolved = Path(policy_path) if policy_path else _DEFAULT_POLICY_PATH | ||
| self._policy = self._load_policy(resolved) | ||
| self._weights: dict[str, float] = { | ||
| key: cfg["weight"] | ||
| for key, cfg in self._policy["risk_scoring"].items() | ||
| } | ||
| self._risk_levels: dict[str, int] = self._policy["risk_levels"] | ||
| self._normalization: dict[str, float] = self._policy["normalization"] | ||
|
|
||
| # ββ Public API ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | ||
|
|
||
| def score(self, signals: dict[str, Any]) -> RiskScoringResult: | ||
| """Compute a context-aware risk score from raw contextual signals. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| signals: | ||
| Dictionary of raw signal values. Expected keys: | ||
|
|
||
| - ``restricted_zone`` β ``bool`` | ||
| - ``repeated_approach`` β ``int`` (entry count) | ||
| - ``loitering`` β ``float`` (dwell seconds) | ||
| - ``after_hours`` β ``bool`` | ||
| - ``reasoning_confidence`` β ``float`` in [0, 1] | ||
|
|
||
| Missing keys are treated as zero / inactive. | ||
|
|
||
| Returns | ||
| ------- | ||
| RiskScoringResult | ||
| A dict with ``risk_score`` (0β100), ``risk_level`` | ||
| (``"Low"`` / ``"Medium"`` / ``"High"``), and | ||
| ``risk_factors`` (list of human-readable strings). | ||
| """ | ||
| normalized = self._normalize_signals(signals) | ||
| raw_score = self._compute_weighted_score(normalized) | ||
| score_100 = round(raw_score * 100) | ||
| score_100 = max(0, min(score_100, 100)) | ||
|
|
||
| return RiskScoringResult( | ||
| risk_score=score_100, | ||
| risk_level=self._classify_risk_level(score_100), | ||
| risk_factors=self._identify_contributing_factors(normalized), | ||
| ) | ||
|
|
||
| # ββ Internal helpers ββββββββββββββββββββββββββββββββββββββββββββββββββ | ||
|
|
||
| def _normalize_signals(self, signals: dict[str, Any]) -> dict[str, float]: | ||
| """Normalise each raw signal to a value in [0.0, 1.0]. | ||
|
|
||
| Boolean signals map to 1.0 / 0.0. Continuous signals are clamped | ||
| against the normalization parameters defined in the policy YAML. | ||
| """ | ||
| loitering_max = self._normalization["loitering_max_seconds"] | ||
| approach_max = self._normalization["repeated_approach_max_count"] | ||
|
|
||
| restricted = signals.get("restricted_zone", False) | ||
| approach_count = signals.get("repeated_approach", 0) | ||
| dwell = signals.get("loitering", 0.0) | ||
| after_hours = signals.get("after_hours", False) | ||
| confidence = signals.get("reasoning_confidence", 0.0) | ||
|
|
||
| return { | ||
| "restricted_zone": 1.0 if restricted else 0.0, | ||
| "repeated_approach": max(0.0, min(approach_count / approach_max, 1.0)) if approach_max > 0 else 0.0, | ||
| "loitering": max(0.0, min(dwell / loitering_max, 1.0)) if loitering_max > 0 else 0.0, | ||
| "after_hours": 1.0 if after_hours else 0.0, | ||
| "reasoning_confidence": max(0.0, min(float(confidence), 1.0)), | ||
| } | ||
|
|
||
| def _compute_weighted_score(self, normalized: dict[str, float]) -> float: | ||
| """Apply YAML weights to normalised signals and return a 0β1 float.""" | ||
| total = 0.0 | ||
| for key, weight in self._weights.items(): | ||
| total += weight * normalized.get(key, 0.0) | ||
| return min(total, 1.0) | ||
|
|
||
| def _classify_risk_level(self, score_100: int) -> str: | ||
| """Classify a 0β100 score into Low / Medium / High.""" | ||
| if score_100 <= self._risk_levels["low_max"]: | ||
| return "Low" | ||
| if score_100 <= self._risk_levels["medium_max"]: | ||
| return "Medium" | ||
| return "High" | ||
|
|
||
| def _identify_contributing_factors( | ||
| self, normalized: dict[str, float] | ||
| ) -> list[str]: | ||
| """Return human-readable labels for signals that contributed.""" | ||
| # Confidence threshold for reporting β only flag genuinely high values | ||
| _CONFIDENCE_HIGH_THRESHOLD = 0.7 | ||
|
|
||
| factors: list[str] = [] | ||
| for key, value in normalized.items(): | ||
| if value > 0.0 and key in _FACTOR_LABELS: | ||
| # "High reasoning confidence" should only appear when | ||
| # confidence is genuinely high, not for any nonzero value. | ||
| if key == "reasoning_confidence" and value < _CONFIDENCE_HIGH_THRESHOLD: | ||
| continue | ||
| factors.append(_FACTOR_LABELS[key]) | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| return factors | ||
|
|
||
| # ββ YAML loading ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | ||
|
|
||
| @staticmethod | ||
| def _load_policy(path: Path) -> dict[str, Any]: | ||
| """Load and validate the YAML policy file. | ||
|
|
||
| Raises | ||
| ------ | ||
| FileNotFoundError | ||
| If *path* does not exist. | ||
| ValueError | ||
| If the YAML is empty, unparseable, missing required sections, | ||
| or contains invalid weight / threshold values. | ||
| """ | ||
| if not path.exists(): | ||
| raise FileNotFoundError(f"Risk policy file not found: {path}") | ||
|
|
||
| try: | ||
| with open(path, "r", encoding="utf-8") as fh: | ||
| data = yaml.safe_load(fh) | ||
| except yaml.YAMLError as exc: | ||
| raise ValueError(f"Invalid YAML in risk policy: {exc}") from exc | ||
|
|
||
| if data is None: | ||
| raise ValueError(f"Risk policy file is empty: {path}") | ||
|
|
||
| for section in ("risk_scoring", "risk_levels", "normalization"): | ||
| if section not in data: | ||
| raise ValueError( | ||
| f"Risk policy missing required section: '{section}'" | ||
| ) | ||
|
|
||
| # Lightweight weight validation | ||
| for signal, cfg in data["risk_scoring"].items(): | ||
| w = cfg.get("weight", 0) | ||
| if not isinstance(w, (int, float)) or w < 0: | ||
| raise ValueError( | ||
| f"Weight for '{signal}' must be a non-negative number, got {w!r}" | ||
| ) | ||
| if w > 1.0: | ||
| raise ValueError( | ||
| f"Weight for '{signal}' must not exceed 1.0, got {w}" | ||
| ) | ||
|
|
||
| # Risk-level threshold ordering | ||
| levels = data["risk_levels"] | ||
| if levels["low_max"] >= levels["medium_max"]: | ||
| raise ValueError( | ||
| f"risk_levels.low_max ({levels['low_max']}) must be less than " | ||
| f"medium_max ({levels['medium_max']})" | ||
| ) | ||
|
|
||
| return data | ||
|
Comment on lines
+203
to
+229
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Enforce full policy schema constraints at load-time.
Proposed hard validation in loader for section in ("risk_scoring", "risk_levels", "normalization"):
if section not in data:
raise ValueError(
f"Risk policy missing required section: '{section}'"
)
+
+ weights = []
+ for signal, cfg in data["risk_scoring"].items():
+ weight = cfg.get("weight")
+ if not isinstance(weight, (int, float)) or weight < 0:
+ raise ValueError(f"Invalid weight for '{signal}': {weight}")
+ weights.append(float(weight))
+ if abs(sum(weights) - 1.0) > 1e-6:
+ raise ValueError("Risk policy weights must sum to 1.0")
+
+ low_max = data["risk_levels"].get("low_max")
+ medium_max = data["risk_levels"].get("medium_max")
+ if not (isinstance(low_max, (int, float)) and isinstance(medium_max, (int, float))):
+ raise ValueError("risk_levels.low_max and medium_max must be numeric")
+ if not (0 <= low_max <= medium_max <= 100):
+ raise ValueError("Risk thresholds must satisfy 0 <= low_max <= medium_max <= 100")
+
+ loitering_max = data["normalization"].get("loitering_max_seconds")
+ approach_max = data["normalization"].get("repeated_approach_max_count")
+ if not (isinstance(loitering_max, (int, float)) and loitering_max > 0):
+ raise ValueError("normalization.loitering_max_seconds must be > 0")
+ if not (isinstance(approach_max, (int, float)) and approach_max > 0):
+ raise ValueError("normalization.repeated_approach_max_count must be > 0")
return dataπ€ Prompt for AI Agents |
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
π§© Analysis chain
π Script executed:
Repository: Devnil434/Eagle
Length of output: 14533
π Script executed:
Repository: Devnil434/Eagle
Length of output: 148
Add PyYAML to services/reasoning/requirements.txt.
The module imports
yamlbut PyYAML is not listed in the service's dependency manifest. Ifservices/reasoning/is deployed independently, startup will fail withModuleNotFoundError. Addpyyaml>=6.0toservices/reasoning/requirements.txt.π€ Prompt for AI Agents