diff --git a/src/interfaces/chat_app/app.py b/src/interfaces/chat_app/app.py index 6c3e877d..71b631f4 100644 --- a/src/interfaces/chat_app/app.py +++ b/src/interfaces/chat_app/app.py @@ -80,6 +80,7 @@ ) from src.utils.rbac.permissions import get_permission_context from src.utils.rbac.audit import log_authentication_event +from src.utils.rbac.jwt_parser import decode_jwt_claims logger = get_logger(__name__) @@ -2517,19 +2518,93 @@ def get_user(self): 'permissions': get_permission_context() }) + def _authenticate_bearer_token(self): + """Authenticate via Bearer token in Authorization header. + + If a valid Bearer token is present, decode the JWT claims, + extract user info and roles, and populate the Flask session + so downstream code works identically to SSO-cookie auth. + + Returns True if authentication succeeded, False otherwise. + """ + auth_header = request.headers.get('Authorization', '') + if not auth_header.startswith('Bearer '): + return False + + token_string = auth_header[7:] + claims = decode_jwt_claims(token_string) + if not claims: + return False + + # Check expiration + import time as _time + exp = claims.get('exp') + if exp and _time.time() > exp: + logger.warning("Bearer token has expired") + return False + + # Extract user info from JWT claims (Keycloak / CERN SSO format) + email = claims.get('email', claims.get('preferred_username', 'unknown')) + name = claims.get('name', claims.get('preferred_username', '')) + username = claims.get('preferred_username', claims.get('email', '')) + user_id = claims.get('sub', '') + + # Extract roles via the existing RBAC helper + user_roles = get_user_roles(claims, email) + + # Upsert user into the users table so that conversation_metadata + # can reference user_id via the FK constraint. + if user_id: + try: + user_service = UserService(pg_config=self.pg_config) + user_service.get_or_create_user( + user_id=user_id, + auth_provider='sso', + display_name=name, + email=email, + ) + except Exception as ue: + logger.warning(f"Failed to upsert bearer user {user_id} into users table: {ue}") + + # Populate the session so all downstream code works unchanged + self._set_user_session( + email=email, + name=name, + username=username, + user_id=user_id, + auth_method='bearer', + roles=user_roles + ) + + log_authentication_event( + user=email, + event_type='bearer_auth', + success=True, + method='bearer', + details=f"Roles: {user_roles}" + ) + return True + def require_auth(self, f): """Decorator to require authentication for routes. - + When SSO is enabled and anonymous access is blocked (sso.allow_anonymous: false), unauthenticated users are redirected to SSO login instead of getting a 401 error. + + Also supports Bearer token authentication via the Authorization header, + allowing programmatic / API access without a browser session. """ @wraps(f) def decorated_function(*args, **kwargs): if not self.auth_enabled: # If auth is not enabled, allow access return f(*args, **kwargs) - + if not session.get('logged_in'): + # Try Bearer token authentication for API requests + if self._authenticate_bearer_token(): + return f(*args, **kwargs) + # Check if SSO is enabled and anonymous access is blocked if self.sso_enabled: registry = get_registry() @@ -2542,16 +2617,18 @@ def decorated_function(*args, **kwargs): method='web', details=f"path={request.path}, method={request.method}" ) + # For API requests return 401 instead of redirect + if request.path.startswith('/api/'): + return jsonify({'error': 'Unauthorized', 'message': 'Authentication required'}), 401 # Redirect to login page which will trigger SSO return redirect(url_for('login')) - + # Return 401 Unauthorized response for API requests - return jsonify({'error': 'Unauthorized', 'message': 'Authentication required'}), 401 if request.path.startswith('/api/'): return jsonify({'error': 'Unauthorized', 'message': 'Authentication required'}), 401 - else: + else: return redirect(url_for('login')) - + return f(*args, **kwargs) return decorated_function @@ -2574,14 +2651,18 @@ def decorated_function(*args, **kwargs): # First check authentication if not self.auth_enabled: return f(*args, **kwargs) - + if not session.get('logged_in'): - if self.sso_enabled: - registry = get_registry() - if not registry.allow_anonymous: - return redirect(url_for('login')) - return jsonify({'error': 'Unauthorized', 'message': 'Authentication required'}), 401 - + # Try Bearer token authentication + if not self._authenticate_bearer_token(): + if self.sso_enabled: + registry = get_registry() + if not registry.allow_anonymous: + if request.path.startswith('/api/'): + return jsonify({'error': 'Unauthorized', 'message': 'Authentication required'}), 401 + return redirect(url_for('login')) + return jsonify({'error': 'Unauthorized', 'message': 'Authentication required'}), 401 + # Now check permission roles = session.get('roles', []) if not has_permission(permission, roles): @@ -3548,7 +3629,7 @@ def get_chat_response(self): 'conversation_id': conversation_id, 'archi_msg_id': message_ids[-1], 'server_response_msg_ts': timestamps['server_response_msg_ts'].timestamp(), - 'model_used': self.current_model_used, + 'model_used': self.chat.current_model_used, 'final_response_msg_ts': datetime.now(timezone.utc).timestamp(), }