diff --git a/app/src/api/alerts.ts b/app/src/api/alerts.ts new file mode 100644 index 00000000..65bd0b13 --- /dev/null +++ b/app/src/api/alerts.ts @@ -0,0 +1,36 @@ +import { api } from './client'; + +export interface LoginAlert { + id: number; + alert_type: string; + severity: string; + message: string; + read: boolean; + created_at: string; +} + +export interface AlertsResponse { + alerts: LoginAlert[]; +} + +export interface UnreadCountResponse { + unread_count: number; +} + +export async function getAlerts(unread = false, limit = 50): Promise { + const params = new URLSearchParams(); + if (unread) params.set('unread', 'true'); + params.set('limit', String(limit)); + return api(`/alerts/?${params.toString()}`); +} + +export async function getUnreadCount(): Promise { + return api('/alerts/unread-count'); +} + +export async function markAlertsRead(alertIds?: number[]): Promise<{ marked_read: number }> { + return api('/alerts/read', { + method: 'POST', + body: alertIds ? { alert_ids: alertIds } : {}, + }); +} diff --git a/app/src/components/SecurityAlerts.tsx b/app/src/components/SecurityAlerts.tsx new file mode 100644 index 00000000..3c17d98e --- /dev/null +++ b/app/src/components/SecurityAlerts.tsx @@ -0,0 +1,174 @@ +import { useEffect, useState } from 'react'; +import { getAlerts, markAlertsRead, LoginAlert } from '@/api/alerts'; +import { Card, CardContent, CardHeader, CardTitle } from '@/components/ui/card'; +import { Badge } from '@/components/ui/badge'; +import { Button } from '@/components/ui/button'; +import { Shield, ShieldAlert, ShieldCheck, ShieldQuestion, Check } from 'lucide-react'; +import { toast } from 'sonner'; + +const SEVERITY_COLORS: Record = { + high: 'bg-red-100 text-red-800 border-red-200', + medium: 'bg-yellow-100 text-yellow-800 border-yellow-200', + low: 'bg-blue-100 text-blue-800 border-blue-200', +}; + +const ALERT_ICONS: Record = { + brute_force: ShieldAlert, + credential_stuffing: ShieldAlert, + new_ip: ShieldQuestion, + new_device: ShieldQuestion, +}; + +function formatRelativeTime(dateStr: string): string { + const date = new Date(dateStr); + const now = new Date(); + const diffMs = now.getTime() - date.getTime(); + const diffMin = Math.floor(diffMs / 60000); + if (diffMin < 1) return 'Just now'; + if (diffMin < 60) return `${diffMin}m ago`; + const diffHr = Math.floor(diffMin / 60); + if (diffHr < 24) return `${diffHr}h ago`; + const diffDays = Math.floor(diffHr / 24); + if (diffDays < 7) return `${diffDays}d ago`; + return date.toLocaleDateString(); +} + +function AlertItem({ + alert, + onMarkRead, +}: { + alert: LoginAlert; + onMarkRead: (id: number) => void; +}) { + const Icon = ALERT_ICONS[alert.alert_type] || Shield; + return ( +
+
+ +
+
+
+ + {alert.severity} + + + {formatRelativeTime(alert.created_at)} + + {!alert.read && ( + + )} +
+

{alert.message}

+
+ {!alert.read && ( + + )} +
+ ); +} + +export default function SecurityAlerts() { + const [alerts, setAlerts] = useState([]); + const [loading, setLoading] = useState(true); + const [filter, setFilter] = useState<'all' | 'unread'>('all'); + + const fetchAlerts = async () => { + try { + const data = await getAlerts(filter === 'unread'); + setAlerts(data.alerts); + } catch { + toast.error('Failed to load security alerts'); + } finally { + setLoading(false); + } + }; + + useEffect(() => { + fetchAlerts(); + }, [filter]); + + const handleMarkRead = async (id: number) => { + try { + await markAlertsRead([id]); + setAlerts((prev) => + prev.map((a) => (a.id === id ? { ...a, read: true } : a)) + ); + } catch { + toast.error('Failed to mark alert as read'); + } + }; + + const handleMarkAllRead = async () => { + try { + await markAlertsRead(); + setAlerts((prev) => prev.map((a) => ({ ...a, read: true }))); + toast.success('All alerts marked as read'); + } catch { + toast.error('Failed to mark alerts as read'); + } + }; + + const unreadCount = alerts.filter((a) => !a.read).length; + + return ( + + +
+ + Security Alerts + {unreadCount > 0 && ( + {unreadCount} unread + )} +
+
+ + + {unreadCount > 0 && ( + + )} +
+
+ + {loading ? ( +
Loading...
+ ) : alerts.length === 0 ? ( +
+ +

No security alerts

+
+ ) : ( +
+ {alerts.map((alert) => ( + + ))} +
+ )} +
+
+ ); +} diff --git a/packages/backend/app/__init__.py b/packages/backend/app/__init__.py index cdf76b45..f55cceed 100644 --- a/packages/backend/app/__init__.py +++ b/packages/backend/app/__init__.py @@ -118,3 +118,44 @@ def _ensure_schema_compatibility(app: Flask) -> None: conn.rollback() finally: conn.close() + + # Create login anomaly detection tables if they don't exist + conn = db.engine.raw_connection() + try: + cur = conn.cursor() + cur.execute( + """ + CREATE TABLE IF NOT EXISTS login_attempts ( + id SERIAL PRIMARY KEY, + user_id INT REFERENCES users(id) ON DELETE SET NULL, + email VARCHAR(255) NOT NULL, + ip_address VARCHAR(45), + user_agent VARCHAR(500), + success BOOLEAN NOT NULL DEFAULT FALSE, + failure_reason VARCHAR(100), + created_at TIMESTAMP NOT NULL DEFAULT NOW() + ); + CREATE INDEX IF NOT EXISTS idx_login_attempts_user_id ON login_attempts(user_id); + CREATE INDEX IF NOT EXISTS idx_login_attempts_email_success ON login_attempts(email, success); + CREATE INDEX IF NOT EXISTS idx_login_attempts_created_at ON login_attempts(created_at DESC); + + CREATE TABLE IF NOT EXISTS login_alerts ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + alert_type VARCHAR(50) NOT NULL, + severity VARCHAR(20) NOT NULL DEFAULT 'medium', + message VARCHAR(500) NOT NULL, + metadata_json TEXT, + read BOOLEAN NOT NULL DEFAULT FALSE, + created_at TIMESTAMP NOT NULL DEFAULT NOW() + ); + CREATE INDEX IF NOT EXISTS idx_login_alerts_user_read ON login_alerts(user_id, read); + CREATE INDEX IF NOT EXISTS idx_login_alerts_created_at ON login_alerts(created_at DESC); + """ + ) + conn.commit() + except Exception: + app.logger.exception("Schema compatibility patch failed for login anomaly tables") + conn.rollback() + finally: + conn.close() diff --git a/packages/backend/app/db/schema.sql b/packages/backend/app/db/schema.sql index 410189de..47944fe3 100644 --- a/packages/backend/app/db/schema.sql +++ b/packages/backend/app/db/schema.sql @@ -123,3 +123,31 @@ CREATE TABLE IF NOT EXISTS audit_logs ( action VARCHAR(100) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW() ); + +-- Login anomaly detection tables +CREATE TABLE IF NOT EXISTS login_attempts ( + id SERIAL PRIMARY KEY, + user_id INT REFERENCES users(id) ON DELETE SET NULL, + email VARCHAR(255) NOT NULL, + ip_address VARCHAR(45), + user_agent VARCHAR(500), + success BOOLEAN NOT NULL DEFAULT FALSE, + failure_reason VARCHAR(100), + created_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_login_attempts_user_id ON login_attempts(user_id); +CREATE INDEX IF NOT EXISTS idx_login_attempts_email_success ON login_attempts(email, success); +CREATE INDEX IF NOT EXISTS idx_login_attempts_created_at ON login_attempts(created_at DESC); + +CREATE TABLE IF NOT EXISTS login_alerts ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + alert_type VARCHAR(50) NOT NULL, + severity VARCHAR(20) NOT NULL DEFAULT 'medium', + message VARCHAR(500) NOT NULL, + metadata_json TEXT, + read BOOLEAN NOT NULL DEFAULT FALSE, + created_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_login_alerts_user_read ON login_alerts(user_id, read); +CREATE INDEX IF NOT EXISTS idx_login_alerts_created_at ON login_alerts(created_at DESC); diff --git a/packages/backend/app/models.py b/packages/backend/app/models.py index 64d44810..a7a1b57f 100644 --- a/packages/backend/app/models.py +++ b/packages/backend/app/models.py @@ -133,3 +133,31 @@ class AuditLog(db.Model): user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True) action = db.Column(db.String(100), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + + +class LoginAttempt(db.Model): + """Tracks every login attempt for anomaly detection.""" + + __tablename__ = "login_attempts" + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True) + email = db.Column(db.String(255), nullable=False) + ip_address = db.Column(db.String(45), nullable=True) + user_agent = db.Column(db.String(500), nullable=True) + success = db.Column(db.Boolean, nullable=False, default=False) + failure_reason = db.Column(db.String(100), nullable=True) + created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + + +class LoginAlert(db.Model): + """Stores anomaly alerts triggered by suspicious login behavior.""" + + __tablename__ = "login_alerts" + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False) + alert_type = db.Column(db.String(50), nullable=False) + severity = db.Column(db.String(20), nullable=False, default="medium") + message = db.Column(db.String(500), nullable=False) + metadata_json = db.Column(db.Text, nullable=True) + read = db.Column(db.Boolean, nullable=False, default=False) + created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) diff --git a/packages/backend/app/routes/__init__.py b/packages/backend/app/routes/__init__.py index f13b0f89..ca4a700c 100644 --- a/packages/backend/app/routes/__init__.py +++ b/packages/backend/app/routes/__init__.py @@ -7,6 +7,7 @@ from .categories import bp as categories_bp from .docs import bp as docs_bp from .dashboard import bp as dashboard_bp +from .alerts import bp as alerts_bp def register_routes(app: Flask): @@ -18,3 +19,4 @@ def register_routes(app: Flask): app.register_blueprint(categories_bp, url_prefix="/categories") app.register_blueprint(docs_bp, url_prefix="/docs") app.register_blueprint(dashboard_bp, url_prefix="/dashboard") + app.register_blueprint(alerts_bp, url_prefix="/alerts") diff --git a/packages/backend/app/routes/alerts.py b/packages/backend/app/routes/alerts.py new file mode 100644 index 00000000..7ccdc05a --- /dev/null +++ b/packages/backend/app/routes/alerts.py @@ -0,0 +1,50 @@ +"""Login security alert management endpoints.""" + +from flask import Blueprint, request, jsonify +from flask_jwt_extended import jwt_required, get_jwt_identity +from ..services.login_anomaly import get_user_alerts, mark_alerts_read + +bp = Blueprint("alerts", __name__) + + +@bp.get("/") +@jwt_required() +def list_alerts(): + """List login security alerts for the current user.""" + uid = int(get_jwt_identity()) + unread_only = request.args.get("unread", "false").lower() == "true" + limit = min(int(request.args.get("limit", 50)), 100) + alerts = get_user_alerts(uid, unread_only=unread_only, limit=limit) + return jsonify( + alerts=[ + { + "id": a.id, + "alert_type": a.alert_type, + "severity": a.severity, + "message": a.message, + "read": a.read, + "created_at": a.created_at.isoformat(), + } + for a in alerts + ] + ) + + +@bp.post("/read") +@jwt_required() +def mark_read(): + """Mark alerts as read. Body: {\"alert_ids\": [1,2,3]} or omit for all.""" + uid = int(get_jwt_identity()) + data = request.get_json() or {} + alert_ids = data.get("alert_ids") + count = mark_alerts_read(uid, alert_ids) + return jsonify(marked_read=count) + + +@bp.get("/unread-count") +@jwt_required() +def unread_count(): + """Get count of unread alerts.""" + uid = int(get_jwt_identity()) + alerts = get_user_alerts(uid, unread_only=True) + return jsonify(unread_count=len(alerts)) diff --git a/packages/backend/app/routes/auth.py b/packages/backend/app/routes/auth.py index 05a39377..bcf2ed38 100644 --- a/packages/backend/app/routes/auth.py +++ b/packages/backend/app/routes/auth.py @@ -10,6 +10,7 @@ ) from ..extensions import db, redis_client from ..models import User +from ..services.login_anomaly import record_login_attempt, detect_anomalies import logging import time @@ -28,6 +29,13 @@ } +def _client_ip() -> str | None: + """Extract client IP from request, respecting proxy headers.""" + if request.headers.get("X-Forwarded-For"): + return request.headers["X-Forwarded-For"].split(",")[0].strip() + return request.remote_addr + + @bp.post("/register") def register(): data = request.get_json() or {} @@ -55,15 +63,55 @@ def login(): data = request.get_json() or {} email = data.get("email") password = data.get("password") + ip_address = _client_ip() + user_agent = request.headers.get("User-Agent") + user = db.session.query(User).filter_by(email=email).first() if not user or not check_password_hash(user.password_hash, password): + # Record failed attempt and check for anomalies + attempt = record_login_attempt( + email=email or "", + success=False, + ip_address=ip_address, + user_agent=user_agent, + failure_reason="invalid_credentials", + ) + try: + detect_anomalies(attempt) + except Exception: + logger.exception("Anomaly detection failed for failed login") logger.warning("Login failed for email=%s", email) return jsonify(error="invalid credentials"), 401 + + # Record successful attempt and check for anomalies + attempt = record_login_attempt( + email=email, + success=True, + ip_address=ip_address, + user_agent=user_agent, + ) + alerts = [] + try: + alerts = detect_anomalies(attempt) + except Exception: + logger.exception("Anomaly detection failed for successful login") + access = create_access_token(identity=str(user.id)) refresh = create_refresh_token(identity=str(user.id)) _store_refresh_session(refresh, str(user.id)) logger.info("Login success user_id=%s", user.id) - return jsonify(access_token=access, refresh_token=refresh) + + response_data = {"access_token": access, "refresh_token": refresh} + if alerts: + response_data["security_alerts"] = [ + { + "type": a.alert_type, + "severity": a.severity, + "message": a.message, + } + for a in alerts + ] + return jsonify(response_data) @bp.get("/me") diff --git a/packages/backend/app/services/login_anomaly.py b/packages/backend/app/services/login_anomaly.py new file mode 100644 index 00000000..4999fefb --- /dev/null +++ b/packages/backend/app/services/login_anomaly.py @@ -0,0 +1,259 @@ +"""Login anomaly detection service. + +Detects suspicious login patterns and generates alerts: +- Rapid failed login attempts (brute force) +- Logins from new IP addresses +- Logins from new devices (user agent) +- Unusual login time patterns +- Multiple failed attempts followed by success (credential stuffing) +""" + +import json +import hashlib +from datetime import datetime, timedelta +from collections import Counter + +from ..extensions import db, redis_client +from ..models import LoginAttempt, LoginAlert, User + +# Detection thresholds +BRUTE_FORCE_WINDOW_MINUTES = 15 +BRUTE_FORCE_THRESHOLD = 5 +NEW_IP_LOOKBACK_DAYS = 30 +NEW_DEVICE_LOOKBACK_DAYS = 90 +CREDENTIAL_STUFFING_WINDOW_MINUTES = 30 +CREDENTIAL_STUFFING_THRESHOLD = 3 + + +def _hash_ua(user_agent: str | None) -> str: + """Hash user agent to a short fingerprint for comparison.""" + if not user_agent: + return "unknown" + return hashlib.sha256(user_agent.encode()).hexdigest()[:16] + + +def record_login_attempt( + email: str, + success: bool, + ip_address: str | None = None, + user_agent: str | None = None, + failure_reason: str | None = None, +) -> LoginAttempt: + """Record a login attempt and return the created record.""" + user = db.session.query(User).filter_by(email=email).first() + attempt = LoginAttempt( + user_id=user.id if user else None, + email=email, + ip_address=ip_address, + user_agent=user_agent, + success=success, + failure_reason=failure_reason, + ) + db.session.add(attempt) + db.session.commit() + + # Cache recent attempt count in Redis for fast brute-force detection + if not success: + key = f"login:fail:{email}" + try: + pipe = redis_client.pipeline() + pipe.incr(key) + pipe.expire(key, BRUTE_FORCE_WINDOW_MINUTES * 60) + pipe.execute() + except Exception: + pass + + return attempt + + +def detect_anomalies(attempt: LoginAttempt) -> list[LoginAlert]: + """Run all anomaly checks on a login attempt. Returns new alerts created.""" + alerts = [] + + if attempt.success and attempt.user_id: + alerts.extend(_detect_new_ip(attempt)) + alerts.extend(_detect_new_device(attempt)) + alerts.extend(_detect_credential_stuffing(attempt)) + + if not attempt.success: + alerts.extend(_detect_brute_force(attempt)) + + for alert in alerts: + db.session.add(alert) + if alerts: + db.session.commit() + + return alerts + + +def _detect_brute_force(attempt: LoginAttempt) -> list[LoginAlert]: + """Detect rapid failed login attempts (brute force).""" + alerts = [] + # Use Redis counter for speed + key = f"login:fail:{attempt.email}" + try: + count = int(redis_client.get(key) or 0) + except Exception: + # Fallback to DB query + window = datetime.utcnow() - timedelta(minutes=BRUTE_FORCE_WINDOW_MINUTES) + count = ( + db.session.query(LoginAttempt) + .filter( + LoginAttempt.email == attempt.email, + LoginAttempt.success.is_(False), + LoginAttempt.created_at >= window, + ) + .count() + ) + + if count >= BRUTE_FORCE_THRESHOLD: + user = db.session.query(User).filter_by(email=attempt.email).first() + if user: + alert = LoginAlert( + user_id=user.id, + alert_type="brute_force", + severity="high", + message=f"{count} failed login attempts in the last {BRUTE_FORCE_WINDOW_MINUTES} minutes", + metadata_json=json.dumps( + { + "failed_count": count, + "window_minutes": BRUTE_FORCE_WINDOW_MINUTES, + "ip_address": attempt.ip_address, + } + ), + ) + alerts.append(alert) + return alerts + + +def _detect_new_ip(attempt: LoginAttempt) -> list[LoginAlert]: + """Detect login from a previously unseen IP address.""" + alerts = [] + if not attempt.ip_address or not attempt.user_id: + return alerts + + cutoff = datetime.utcnow() - timedelta(days=NEW_IP_LOOKBACK_DAYS) + known_ips = ( + db.session.query(LoginAttempt.ip_address) + .filter( + LoginAttempt.user_id == attempt.user_id, + LoginAttempt.success.is_(True), + LoginAttempt.created_at >= cutoff, + LoginAttempt.id != attempt.id, + ) + .distinct() + .all() + ) + known_ip_set = {row[0] for row in known_ips} + + if attempt.ip_address not in known_ip_set and len(known_ip_set) > 0: + alert = LoginAlert( + user_id=attempt.user_id, + alert_type="new_ip", + severity="medium", + message=f"Login from new IP address: {attempt.ip_address}", + metadata_json=json.dumps( + { + "new_ip": attempt.ip_address, + "known_ips_count": len(known_ip_set), + } + ), + ) + alerts.append(alert) + return alerts + + +def _detect_new_device(attempt: LoginAttempt) -> list[LoginAlert]: + """Detect login from a previously unseen device (user agent).""" + alerts = [] + if not attempt.user_agent or not attempt.user_id: + return alerts + + current_ua_hash = _hash_ua(attempt.user_agent) + cutoff = datetime.utcnow() - timedelta(days=NEW_DEVICE_LOOKBACK_DAYS) + + past_attempts = ( + db.session.query(LoginAttempt.user_agent) + .filter( + LoginAttempt.user_id == attempt.user_id, + LoginAttempt.success.is_(True), + LoginAttempt.created_at >= cutoff, + LoginAttempt.user_agent.isnot(None), + LoginAttempt.id != attempt.id, + ) + .distinct() + .all() + ) + known_ua_hashes = {_hash_ua(row[0]) for row in past_attempts} + + if current_ua_hash not in known_ua_hashes and len(known_ua_hashes) > 0: + alert = LoginAlert( + user_id=attempt.user_id, + alert_type="new_device", + severity="medium", + message="Login from a new device", + metadata_json=json.dumps( + { + "user_agent": attempt.user_agent[:200], + "known_devices_count": len(known_ua_hashes), + } + ), + ) + alerts.append(alert) + return alerts + + +def _detect_credential_stuffing(attempt: LoginAttempt) -> list[LoginAlert]: + """Detect multiple failed attempts followed by success (credential stuffing pattern).""" + alerts = [] + if not attempt.user_id: + return alerts + + window = datetime.utcnow() - timedelta(minutes=CREDENTIAL_STUFFING_WINDOW_MINUTES) + recent_failures = ( + db.session.query(LoginAttempt) + .filter( + LoginAttempt.user_id == attempt.user_id, + LoginAttempt.success.is_(False), + LoginAttempt.created_at >= window, + LoginAttempt.id != attempt.id, + ) + .count() + ) + + if recent_failures >= CREDENTIAL_STUFFING_THRESHOLD: + alert = LoginAlert( + user_id=attempt.user_id, + alert_type="credential_stuffing", + severity="high", + message=f"Successful login after {recent_failures} failed attempts — possible credential stuffing", + metadata_json=json.dumps( + { + "prior_failures": recent_failures, + "window_minutes": CREDENTIAL_STUFFING_WINDOW_MINUTES, + "ip_address": attempt.ip_address, + } + ), + ) + alerts.append(alert) + return alerts + + +def get_user_alerts( + user_id: int, unread_only: bool = False, limit: int = 50 +) -> list[LoginAlert]: + """Retrieve login alerts for a user.""" + query = db.session.query(LoginAlert).filter_by(user_id=user_id) + if unread_only: + query = query.filter_by(read=False) + return query.order_by(LoginAlert.created_at.desc()).limit(limit).all() + + +def mark_alerts_read(user_id: int, alert_ids: list[int] | None = None) -> int: + """Mark alerts as read. If alert_ids is None, mark all unread.""" + query = db.session.query(LoginAlert).filter_by(user_id=user_id, read=False) + if alert_ids: + query = query.filter(LoginAlert.id.in_(alert_ids)) + count = query.update({"read": True}, synchronize_session=False) + db.session.commit() + return count diff --git a/packages/backend/tests/test_login_anomaly.py b/packages/backend/tests/test_login_anomaly.py new file mode 100644 index 00000000..3d221990 --- /dev/null +++ b/packages/backend/tests/test_login_anomaly.py @@ -0,0 +1,379 @@ +"""Tests for login anomaly detection.""" + +import json +import pytest +from unittest.mock import patch +from app import create_app +from app.config import Settings +from app.extensions import db, redis_client +from app import models +from app.models import User, LoginAttempt, LoginAlert +from app.services.login_anomaly import ( + record_login_attempt, + detect_anomalies, + get_user_alerts, + mark_alerts_read, + _detect_brute_force, + _detect_new_ip, + _detect_new_device, + _detect_credential_stuffing, +) + + +class TestSettings(Settings): + database_url: str = "sqlite+pysqlite:///:memory:" + redis_url: str = "redis://localhost:6379/15" + jwt_secret: str = "test-secret-with-32-plus-chars-1234567890" + + +@pytest.fixture() +def app(): + settings = TestSettings() + app = create_app(settings) + app.config.update(TESTING=True) + with app.app_context(): + db.create_all() + try: + redis_client.flushdb() + except Exception: + pass + yield app + with app.app_context(): + db.session.remove() + db.drop_all() + try: + redis_client.flushdb() + except Exception: + pass + + +@pytest.fixture() +def client(app): + return app.test_client() + + +@pytest.fixture() +def user(app): + with app.app_context(): + user = User( + email="anomaly@test.com", + password_hash="hashed", + preferred_currency="USD", + ) + db.session.add(user) + db.session.commit() + return user.id + + +class TestRecordLoginAttempt: + def test_records_successful_attempt(self, app, user): + with app.app_context(): + attempt = record_login_attempt( + email="anomaly@test.com", + success=True, + ip_address="192.168.1.1", + user_agent="Mozilla/5.0", + ) + assert attempt.id is not None + assert attempt.success is True + assert attempt.ip_address == "192.168.1.1" + assert attempt.user_id == user + + def test_records_failed_attempt(self, app, user): + with app.app_context(): + attempt = record_login_attempt( + email="anomaly@test.com", + success=False, + ip_address="10.0.0.1", + failure_reason="invalid_credentials", + ) + assert attempt.success is False + assert attempt.failure_reason == "invalid_credentials" + + def test_records_unknown_email(self, app): + with app.app_context(): + attempt = record_login_attempt( + email="nonexistent@test.com", + success=False, + ) + assert attempt.user_id is None + assert attempt.email == "nonexistent@test.com" + + +class TestBruteForceDetection: + def test_no_alert_below_threshold(self, app, user): + with app.app_context(): + for _ in range(3): + attempt = record_login_attempt( + email="anomaly@test.com", success=False + ) + alerts = _detect_brute_force(attempt) + assert len(alerts) == 0 + + def test_alert_at_threshold(self, app, user): + with app.app_context(): + for _ in range(5): + attempt = record_login_attempt( + email="anomaly@test.com", success=False + ) + alerts = _detect_brute_force(attempt) + assert len(alerts) == 1 + assert alerts[0].alert_type == "brute_force" + assert alerts[0].severity == "high" + + +class TestNewIPDetection: + def test_no_alert_first_login(self, app, user): + with app.app_context(): + attempt = record_login_attempt( + email="anomaly@test.com", + success=True, + ip_address="1.2.3.4", + ) + alerts = _detect_new_ip(attempt) + assert len(alerts) == 0 + + def test_alert_on_new_ip(self, app, user): + with app.app_context(): + # First login from known IP + record_login_attempt( + email="anomaly@test.com", + success=True, + ip_address="1.2.3.4", + ) + # Login from new IP + attempt = record_login_attempt( + email="anomaly@test.com", + success=True, + ip_address="5.6.7.8", + ) + alerts = _detect_new_ip(attempt) + assert len(alerts) == 1 + assert alerts[0].alert_type == "new_ip" + assert "5.6.7.8" in alerts[0].message + + def test_no_alert_same_ip(self, app, user): + with app.app_context(): + record_login_attempt( + email="anomaly@test.com", + success=True, + ip_address="1.2.3.4", + ) + attempt = record_login_attempt( + email="anomaly@test.com", + success=True, + ip_address="1.2.3.4", + ) + alerts = _detect_new_ip(attempt) + assert len(alerts) == 0 + + +class TestNewDeviceDetection: + def test_alert_on_new_device(self, app, user): + with app.app_context(): + # First login + record_login_attempt( + email="anomaly@test.com", + success=True, + user_agent="Chrome/120.0", + ) + # New device + attempt = record_login_attempt( + email="anomaly@test.com", + success=True, + user_agent="Firefox/115.0", + ) + alerts = _detect_new_device(attempt) + assert len(alerts) == 1 + assert alerts[0].alert_type == "new_device" + + +class TestCredentialStuffingDetection: + def test_alert_after_failures_then_success(self, app, user): + with app.app_context(): + # 3 failed attempts + for _ in range(3): + record_login_attempt( + email="anomaly@test.com", success=False + ) + # Successful login + attempt = record_login_attempt( + email="anomaly@test.com", success=True + ) + alerts = _detect_credential_stuffing(attempt) + assert len(alerts) == 1 + assert alerts[0].alert_type == "credential_stuffing" + assert alerts[0].severity == "high" + + +class TestDetectAnomalies: + def test_combined_detection(self, app, user): + with app.app_context(): + # Establish baseline with one successful login + record_login_attempt( + email="anomaly@test.com", + success=True, + ip_address="1.2.3.4", + user_agent="Chrome/120", + ) + # New IP + new device + after failures + for _ in range(3): + record_login_attempt( + email="anomaly@test.com", success=False + ) + attempt = record_login_attempt( + email="anomaly@test.com", + success=True, + ip_address="9.9.9.9", + user_agent="TorBrowser/1.0", + ) + alerts = detect_anomalies(attempt) + alert_types = {a.alert_type for a in alerts} + assert "new_ip" in alert_types + assert "new_device" in alert_types + assert "credential_stuffing" in alert_types + + +class TestAlertManagement: + def test_get_user_alerts(self, app, user): + with app.app_context(): + # Create some alerts + for i in range(3): + alert = LoginAlert( + user_id=user, + alert_type="new_ip", + severity="medium", + message=f"Test alert {i}", + ) + db.session.add(alert) + db.session.commit() + + alerts = get_user_alerts(user) + assert len(alerts) == 3 + + alerts_unread = get_user_alerts(user, unread_only=True) + assert len(alerts_unread) == 3 + + def test_mark_alerts_read(self, app, user): + with app.app_context(): + alert = LoginAlert( + user_id=user, + alert_type="new_ip", + severity="medium", + message="Test", + ) + db.session.add(alert) + db.session.commit() + + count = mark_alerts_read(user, [alert.id]) + assert count == 1 + + alerts = get_user_alerts(user, unread_only=True) + assert len(alerts) == 0 + + +class TestAuthIntegration: + def test_login_records_attempt(self, client, user): + """Test that login endpoint records attempts and returns alerts.""" + # Failed login + r = client.post( + "/auth/login", + json={"email": "anomaly@test.com", "password": "wrong"}, + ) + assert r.status_code == 401 + + # Successful login + from werkzeug.security import generate_password_hash + with client.application.app_context(): + u = db.session.get(User, user) + u.password_hash = generate_password_hash("correct") + db.session.commit() + + # Mock redis for the login flow + with patch("app.routes.auth.redis_client") as mock_redis, \ + patch("app.services.login_anomaly.redis_client") as mock_redis2: + mock_redis.get.return_value = None + mock_redis.pipeline.return_value.execute.return_value = [1, True] + mock_redis2.get.return_value = None + mock_redis2.pipeline.return_value.execute.return_value = [1, True] + r = client.post( + "/auth/login", + json={"email": "anomaly@test.com", "password": "correct"}, + ) + assert r.status_code == 200 + data = r.get_json() + assert "access_token" in data + + def test_alerts_endpoint_requires_auth(self, client): + r = client.get("/alerts/") + assert r.status_code == 401 + + def test_alerts_endpoint(self, client, user): + from werkzeug.security import generate_password_hash + with client.application.app_context(): + u = db.session.get(User, user) + u.password_hash = generate_password_hash("pass123") + alert = LoginAlert( + user_id=user, + alert_type="new_ip", + severity="medium", + message="Test alert", + ) + db.session.add(alert) + db.session.commit() + + with patch("app.routes.auth.redis_client") as mock_redis, \ + patch("app.services.login_anomaly.redis_client") as mock_redis2: + mock_redis.get.return_value = None + mock_redis.pipeline.return_value.execute.return_value = [1, True] + mock_redis2.get.return_value = None + mock_redis2.pipeline.return_value.execute.return_value = [1, True] + r = client.post( + "/auth/login", + json={"email": "anomaly@test.com", "password": "pass123"}, + ) + token = r.get_json()["access_token"] + + r = client.get( + "/alerts/", + headers={"Authorization": f"Bearer {token}"}, + ) + assert r.status_code == 200 + data = r.get_json() + assert len(data["alerts"]) >= 1 + assert data["alerts"][0]["alert_type"] == "new_ip" + + def test_unread_count(self, client, user): + from werkzeug.security import generate_password_hash + with client.application.app_context(): + u = db.session.get(User, user) + u.password_hash = generate_password_hash("pass123") + for i in range(3): + db.session.add( + LoginAlert( + user_id=user, + alert_type="new_ip", + severity="medium", + message=f"Alert {i}", + ) + ) + db.session.commit() + + with patch("app.routes.auth.redis_client") as mock_redis, \ + patch("app.services.login_anomaly.redis_client") as mock_redis2: + mock_redis.get.return_value = None + mock_redis.pipeline.return_value.execute.return_value = [1, True] + mock_redis2.get.return_value = None + mock_redis2.pipeline.return_value.execute.return_value = [1, True] + r = client.post( + "/auth/login", + json={"email": "anomaly@test.com", "password": "pass123"}, + ) + token = r.get_json()["access_token"] + + r = client.get( + "/alerts/unread-count", + headers={"Authorization": f"Bearer {token}"}, + ) + assert r.status_code == 200 + assert r.get_json()["unread_count"] == 3