diff --git a/ruoyi-fastapi-backend/.env.dev b/ruoyi-fastapi-backend/.env.dev index b33684a..b8e470d 100644 --- a/ruoyi-fastapi-backend/.env.dev +++ b/ruoyi-fastapi-backend/.env.dev @@ -78,6 +78,21 @@ REDIS_PASSWORD = '' REDIS_DATABASE = 2 # -------- 日志配置 -------- +# 是否启用日志脱敏 +LOG_MASK_ENABLED = false +# 日志脱敏占位符 +LOG_MASK_PLACEHOLDER = '******' +# 以下三项为互补关系(非互斥): +# 1) LOG_MASK_FIELDS:命中后执行全量脱敏 +# 2) LOG_PARTIAL_MASK_FIELDS:命中后执行部分脱敏 +# 3) LOG_CONFIG_SECRET_PATTERNS:按 configKey 关键词决定是否脱敏 configValue +# 全量脱敏字段,多个值使用逗号分隔 +LOG_MASK_FIELDS = 'password,old_password,new_password,confirm_password,api_key,token,access_token,refresh_token,authorization,client_secret,secret,secret_key,private_key,private_key_pem,credential,credentials,sms_code,captcha_code,system_prompt' +# 部分脱敏字段,多个值使用逗号分隔 +# 当前默认不对IP脱敏;如后期需要开启,可直接追加 ip,ipaddr,oper_ip,login_ip 并重启服务 +LOG_PARTIAL_MASK_FIELDS = 'phonenumber,phone,mobile,email' +# 按配置键名识别敏感配置的关键词,多个值使用逗号分隔 +LOG_CONFIG_SECRET_PATTERNS = 'password,token,secret,key,private,credential,access,jwt,captcha,sms' # Redis Stream Key LOG_STREAM_KEY = 'log:stream' # Redis Stream 消费组名称 diff --git a/ruoyi-fastapi-backend/.env.dockermy b/ruoyi-fastapi-backend/.env.dockermy index 71481c9..db6df78 100644 --- a/ruoyi-fastapi-backend/.env.dockermy +++ b/ruoyi-fastapi-backend/.env.dockermy @@ -78,6 +78,21 @@ REDIS_PASSWORD = '' REDIS_DATABASE = 2 # -------- 日志配置 -------- +# 是否启用日志脱敏 +LOG_MASK_ENABLED = true +# 日志脱敏占位符 +LOG_MASK_PLACEHOLDER = '******' +# 以下三项为互补关系(非互斥): +# 1) LOG_MASK_FIELDS:命中后执行全量脱敏 +# 2) LOG_PARTIAL_MASK_FIELDS:命中后执行部分脱敏 +# 3) LOG_CONFIG_SECRET_PATTERNS:按 configKey 关键词决定是否脱敏 configValue +# 全量脱敏字段,多个值使用逗号分隔 +LOG_MASK_FIELDS = 'password,old_password,new_password,confirm_password,api_key,token,access_token,refresh_token,authorization,client_secret,secret,secret_key,private_key,private_key_pem,credential,credentials,sms_code,captcha_code,system_prompt' +# 部分脱敏字段,多个值使用逗号分隔 +# 当前默认不对IP脱敏;如后期需要开启,可直接追加 ip,ipaddr,oper_ip,login_ip 并重启服务 +LOG_PARTIAL_MASK_FIELDS = 'phonenumber,phone,mobile,email' +# 按配置键名识别敏感配置的关键词,多个值使用逗号分隔 +LOG_CONFIG_SECRET_PATTERNS = 'password,token,secret,key,private,credential,access,jwt,captcha,sms' # Redis Stream Key LOG_STREAM_KEY = 'log:stream' # Redis Stream 消费组名称 diff --git a/ruoyi-fastapi-backend/.env.dockerpg b/ruoyi-fastapi-backend/.env.dockerpg index c00be41..cf1e480 100644 --- a/ruoyi-fastapi-backend/.env.dockerpg +++ b/ruoyi-fastapi-backend/.env.dockerpg @@ -78,6 +78,21 @@ REDIS_PASSWORD = '' REDIS_DATABASE = 2 # -------- 日志配置 -------- +# 是否启用日志脱敏 +LOG_MASK_ENABLED = true +# 日志脱敏占位符 +LOG_MASK_PLACEHOLDER = '******' +# 以下三项为互补关系(非互斥): +# 1) LOG_MASK_FIELDS:命中后执行全量脱敏 +# 2) LOG_PARTIAL_MASK_FIELDS:命中后执行部分脱敏 +# 3) LOG_CONFIG_SECRET_PATTERNS:按 configKey 关键词决定是否脱敏 configValue +# 全量脱敏字段,多个值使用逗号分隔 +LOG_MASK_FIELDS = 'password,old_password,new_password,confirm_password,api_key,token,access_token,refresh_token,authorization,client_secret,secret,secret_key,private_key,private_key_pem,credential,credentials,sms_code,captcha_code,system_prompt' +# 部分脱敏字段,多个值使用逗号分隔 +# 当前默认不对IP脱敏;如后期需要开启,可直接追加 ip,ipaddr,oper_ip,login_ip 并重启服务 +LOG_PARTIAL_MASK_FIELDS = 'phonenumber,phone,mobile,email' +# 按配置键名识别敏感配置的关键词,多个值使用逗号分隔 +LOG_CONFIG_SECRET_PATTERNS = 'password,token,secret,key,private,credential,access,jwt,captcha,sms' # Redis Stream Key LOG_STREAM_KEY = 'log:stream' # Redis Stream 消费组名称 diff --git a/ruoyi-fastapi-backend/.env.prod b/ruoyi-fastapi-backend/.env.prod index 24d0a84..4e4046c 100644 --- a/ruoyi-fastapi-backend/.env.prod +++ b/ruoyi-fastapi-backend/.env.prod @@ -78,6 +78,21 @@ REDIS_PASSWORD = '' REDIS_DATABASE = 2 # -------- 日志配置 -------- +# 是否启用日志脱敏 +LOG_MASK_ENABLED = true +# 日志脱敏占位符 +LOG_MASK_PLACEHOLDER = '******' +# 以下三项为互补关系(非互斥): +# 1) LOG_MASK_FIELDS:命中后执行全量脱敏 +# 2) LOG_PARTIAL_MASK_FIELDS:命中后执行部分脱敏 +# 3) LOG_CONFIG_SECRET_PATTERNS:按 configKey 关键词决定是否脱敏 configValue +# 全量脱敏字段,多个值使用逗号分隔 +LOG_MASK_FIELDS = 'password,old_password,new_password,confirm_password,api_key,token,access_token,refresh_token,authorization,client_secret,secret,secret_key,private_key,private_key_pem,credential,credentials,sms_code,captcha_code,system_prompt' +# 部分脱敏字段,多个值使用逗号分隔 +# 当前默认不对IP脱敏;如后期需要开启,可直接追加 ip,ipaddr,oper_ip,login_ip 并重启服务 +LOG_PARTIAL_MASK_FIELDS = 'phonenumber,phone,mobile,email' +# 按配置键名识别敏感配置的关键词,多个值使用逗号分隔 +LOG_CONFIG_SECRET_PATTERNS = 'password,token,secret,key,private,credential,access,jwt,captcha,sms' # Redis Stream Key LOG_STREAM_KEY = 'log:stream' # Redis Stream 消费组名称 diff --git a/ruoyi-fastapi-backend/common/annotation/log_annotation.py b/ruoyi-fastapi-backend/common/annotation/log_annotation.py index 769409c..ce096d7 100644 --- a/ruoyi-fastapi-backend/common/annotation/log_annotation.py +++ b/ruoyi-fastapi-backend/common/annotation/log_annotation.py @@ -2,7 +2,9 @@ import json import time from collections.abc import Awaitable, Callable +from copy import deepcopy from datetime import datetime +from enum import Enum from functools import wraps from typing import Any, Literal, TypeVar @@ -22,36 +24,131 @@ from module_admin.service.log_service import LogQueueService from utils.client_ip_util import ClientIPUtil from utils.dependency_util import DependencyUtil -from utils.log_util import logger +from utils.log_util import LogSanitizer, logger from utils.response_util import ResponseUtil P = ParamSpec('P') R = TypeVar('R') +class _LogFieldRoot(str, Enum): + """ + 日志字段路径根节点 + """ + + def field(self, *parts: str) -> str: + """ + 生成 include 字段路径 + + :param parts: 后续字段路径片段 + :return: 完整字段路径 + """ + return '.'.join((self.value, *parts)) if parts else self.value + + +class RequestLogFieldRoot(_LogFieldRoot): + """ + 请求日志字段路径支持的根节点 + """ + + PATH_PARAMS = 'path_params' + QUERY_PARAMS = 'query_params' + JSON_BODY = 'json_body' + FORM_DATA = 'form_data' + FILES = 'files' + RAW_BODY = 'raw_body' + + +class ResponseLogFieldRoot(_LogFieldRoot): + """ + 响应日志字段路径推荐的根节点 + """ + + CODE = 'code' + MSG = 'msg' + DATA = 'data' + ROWS = 'rows' + SUCCESS = 'success' + TIME = 'time' + + class Log: """ 日志装饰器 + + 支持的日志模式: + - `full`: 记录脱敏后的完整载荷 + - `none`: 不记录对应方向的载荷 + - `summary`: 仅记录摘要信息,如顶层键、状态码、rows数量等 + - `include`: 仅记录白名单字段,适合高敏感接口 + - `exclude`: 记录完整载荷后排除少数字段,适合中敏感接口 + + 模式建议: + - 普通后台管理接口可使用`full` + - 包含大量字段但只关心结构时使用`summary` + - 包含密钥、密码、提示词、配置项等高敏感字段时优先使用`include` + - 字段较多但只需排除极少数字段时可使用`exclude` + - 完全无需保留请求体或响应体时使用`none` + + include / exclude 模式字段路径规则: + - 使用`.`分隔路径,如`json_body.modelCode`、`data.userId` + - 请求日志推荐根节点:`path_params`、`query_params`、`json_body`、`form_data`、`files`、`raw_body` + - 响应日志推荐根节点:`code`、`msg`、`data`、`rows`、`success`、`time` + - 字段名优先按原样精确匹配,若未命中会自动尝试 snake_case、camelCase、kebab-case 归一化匹配 + - 数组使用数字下标,如`rows.0.userName` + - 当前不支持通配符 + - 推荐优先使用`RequestLogFieldRoot.JSON_BODY.field(...)`和`ResponseLogFieldRoot.DATA.field(...)`构造字段路径 + + 示例: + - `request_include_fields=(RequestLogFieldRoot.JSON_BODY.field('model_code'),)` + - `response_include_fields=(ResponseLogFieldRoot.DATA.field('userName'),)` + - `request_exclude_fields=('json_body.api_key',)` """ + _REQUEST_INCLUDE_ROOTS = tuple(item.value for item in RequestLogFieldRoot) + _RESPONSE_INCLUDE_ROOTS = tuple(item.value for item in ResponseLogFieldRoot) + _MISSING = object() + _AMBIGUOUS = object() + def __init__( self, title: str, business_type: BusinessType, log_type: Literal['login', 'operation'] | None = 'operation', + request_log_mode: Literal['full', 'none', 'summary', 'include', 'exclude'] = 'full', + response_log_mode: Literal['full', 'none', 'summary', 'include', 'exclude'] = 'full', + request_include_fields: tuple[str, ...] | None = None, + response_include_fields: tuple[str, ...] | None = None, + request_exclude_fields: tuple[str, ...] | None = None, + response_exclude_fields: tuple[str, ...] | None = None, ) -> None: """ 日志装饰器 :param title: 当前日志装饰器装饰的模块标题 :param business_type: 业务类型(OTHER其它 INSERT新增 UPDATE修改 DELETE删除 GRANT授权 EXPORT导出 IMPORT导入 FORCE强退 GENCODE生成代码 CLEAN清空数据) - :param log_type: 日志类型(login表示登录日志,operation表示为操作日志) + :param log_type: 日志类型;`login`表示登录日志,仅落登录信息,`operation`表示操作日志,会落请求/响应摘要与操作人信息 + :param request_log_mode: 请求日志记录模式;`full`记录脱敏后的完整请求,`none`不记录请求体,`summary`记录请求摘要,`include`仅记录request_include_fields指定的字段,`exclude`记录完整请求后排除request_exclude_fields指定的字段 + :param response_log_mode: 响应日志记录模式;`full`记录脱敏后的完整响应,`none`不记录响应体,`summary`记录响应摘要,`include`仅记录response_include_fields指定的字段,`exclude`记录完整响应后排除response_exclude_fields指定的字段 + :param request_include_fields: 请求日志白名单字段路径,仅在request_log_mode='include'时生效;推荐从path_params/query_params/json_body/form_data/files/raw_body开始,字段名支持 snake_case 与 camelCase 自动兼容 + :param response_include_fields: 响应日志白名单字段路径,仅在response_log_mode='include'时生效;推荐从code/msg/data/rows/success/time开始,字段名支持 snake_case 与 camelCase 自动兼容 + :param request_exclude_fields: 请求日志排除字段路径,仅在request_log_mode='exclude'时生效;推荐从path_params/query_params/json_body/form_data/files/raw_body开始,字段名支持 snake_case 与 camelCase 自动兼容 + :param response_exclude_fields: 响应日志排除字段路径,仅在response_log_mode='exclude'时生效;推荐从code/msg/data/rows/success/time开始,字段名支持 snake_case 与 camelCase 自动兼容 :return: """ self.title = title self.business_type = business_type.value self.log_type = log_type + self.request_log_mode = request_log_mode + self.response_log_mode = response_log_mode + self.request_include_fields = request_include_fields or () + self.response_include_fields = response_include_fields or () + self.request_exclude_fields = request_exclude_fields or () + self.response_exclude_fields = response_exclude_fields or () self._oper_param_len = 2000 + self._warned_field_path_warnings: set[str] = set() + self._validate_request_field_paths_strict() + self._warn_invalid_field_path_config() def __call__(self, func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]: @wraps(func) @@ -64,7 +161,7 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: request = get_function_parameters_value_by_name(func, request_name_list[0], *args, **kwargs) DependencyUtil.check_exclude_routes(request, err_msg='当前路由不在认证规则内,不可使用Log装饰器') request_method = request.method - user_agent = request.headers.get('User-Agent') + user_agent = request.headers.get('User-Agent') or '' # 获取操作类型 operator_type = self._get_oper_type(user_agent) # 获取请求的url @@ -74,7 +171,14 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: # 获取请求ip归属区域 oper_location = await self._get_oper_location(oper_ip) # 获取请求参数 - oper_param = await self._get_request_params(request) + oper_param_payload = LogSanitizer.sanitize_data(await self._get_request_params(request)) + oper_param = self._build_log_text( + oper_param_payload, + self.request_log_mode, + self.request_include_fields, + self.request_exclude_fields, + payload_kind='request', + ) # 日志表请求参数字段长度最大为2000,因此在此处判断长度 if len(oper_param) > self._oper_param_len: oper_param = '请求参数过长' @@ -100,10 +204,18 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: # 判断请求是否来自api文档 request_from_swagger, request_from_redoc = self._is_request_from_swagger_or_redoc(request) # 根据响应结果的类型使用不同的方法获取响应结果参数 - result_dict = self._get_result_dict(result, request_from_swagger, request_from_redoc) - json_result = json.dumps(result_dict, ensure_ascii=False) + sanitized_result_dict = LogSanitizer.sanitize_data( + self._get_result_dict(result, request_from_swagger, request_from_redoc) + ) + json_result = self._build_log_text( + sanitized_result_dict, + self.response_log_mode, + self.response_include_fields, + self.response_exclude_fields, + payload_kind='response', + ) # 根据响应结果获取响应状态及异常信息 - status, error_msg = self._get_status_and_error_msg(result_dict) + status, error_msg = self._get_status_and_error_msg(sanitized_result_dict) # 根据日志类型向对应的日志表插入数据 if self.log_type == 'login': # 登录请求来自于api文档时不记录登录日志,其余情况则记录 @@ -116,7 +228,7 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: 'loginTime': oper_time, 'userName': user.username, 'status': str(status), - 'msg': result_dict.get('msg'), + 'msg': sanitized_result_dict.get('msg') or '', } ) @@ -172,10 +284,11 @@ def _get_oper_type(self, user_agent: Any) -> int: :param user_agent: 用户代理字符串 :return: 操作类型 """ + user_agent_text = user_agent or '' operator_type = 0 - if 'Windows' in user_agent or 'Macintosh' in user_agent or 'Linux' in user_agent: + if 'Windows' in user_agent_text or 'Macintosh' in user_agent_text or 'Linux' in user_agent_text: operator_type = 1 - if 'Mobile' in user_agent or 'Android' in user_agent or 'iPhone' in user_agent: + if 'Mobile' in user_agent_text or 'Android' in user_agent_text or 'iPhone' in user_agent_text: operator_type = 2 return operator_type @@ -193,12 +306,12 @@ async def _get_oper_location(self, oper_ip: str) -> str: return oper_location - async def _get_request_params(self, request: Request) -> str: + async def _get_request_params(self, request: Request) -> dict[str, Any]: """ 获取请求参数 :param request: Request对象 - :return: 格式化后的请求参数字符串 + :return: 结构化请求参数 """ params = {} @@ -212,40 +325,568 @@ async def _get_request_params(self, request: Request) -> str: # JSON请求 if 'application/json' in content_type: - json_body = await request.json() - if json_body: - params['json_body'] = json_body + params.update(await self._get_json_request_params(request, content_type)) # 表单数据 elif 'multipart/form-data' in content_type or 'application/x-www-form-urlencoded' in content_type: - form_data = await request.form() - if form_data: - # 过滤掉文件对象,只保留普通表单字段 - form_dict = {key: value for key, value in form_data.items() if not hasattr(value, 'filename')} - if form_dict: - params['form_data'] = form_dict - - # 仅在multipart时尝试处理文件 - if 'multipart/form-data' in content_type: - file_info = {} - for key, value in form_data.items(): - if hasattr(value, 'filename'): - file_info[key] = { - 'filename': value.filename, - 'content_type': value.content_type, - 'size': value.size, - 'headers': dict(value.headers), - } - if file_info: - params['files'] = file_info + params.update(await self._get_form_request_params(request, content_type)) # 其他文本请求 elif 'application/octet-stream' not in content_type: - body = await request.body() - if body: - params['raw_body'] = body.decode('utf-8') + params.update(await self._get_raw_request_params(request)) + + return params + + async def _get_json_request_params(self, request: Request, content_type: str) -> dict[str, Any]: + """ + 获取 JSON 请求参数,解析失败时自动降级为原始文本 + + :param request: Request对象 + :param content_type: 请求头中的 content-type + :return: 结构化请求参数 + """ + params = {} + try: + json_body = await request.json() + except Exception as exc: + logger.warning( + 'Log装饰器请求体解析失败,已降级为raw_body记录,path={}, content_type={}, error_type={}', + request.url.path, + content_type, + type(exc).__name__, + ) + params.update(await self._get_raw_request_params(request)) + else: + if json_body: + params['json_body'] = json_body + return params + + async def _get_form_request_params(self, request: Request, content_type: str) -> dict[str, Any]: + """ + 获取表单请求参数 + + :param request: Request对象 + :param content_type: 请求头中的 content-type + :return: 结构化请求参数 + """ + params = {} + form_data = await request.form() + if not form_data: + return params + + # 过滤掉文件对象,只保留普通表单字段 + form_dict = {key: value for key, value in form_data.items() if not hasattr(value, 'filename')} + if form_dict: + params['form_data'] = form_dict + + # 仅在multipart时尝试处理文件 + if 'multipart/form-data' not in content_type: + return params + + file_info = {} + for key, value in form_data.items(): + if hasattr(value, 'filename'): + file_info[key] = { + 'filename': value.filename, + 'content_type': value.content_type, + 'size': value.size, + 'headers': dict(value.headers), + } + if file_info: + params['files'] = file_info + return params + + async def _get_raw_request_params(self, request: Request) -> dict[str, Any]: + """ + 获取原始文本请求参数 + + :param request: Request对象 + :return: 原始文本请求参数 + """ + body = await request.body() + if not body: + return {} + return {'raw_body': self._decode_request_body(body)} + + @staticmethod + def _decode_request_body(body: bytes) -> str: + """ + 安全解码请求体,避免日志采集影响主流程 + + :param body: 原始请求体字节 + :return: 解码后的请求体文本 + """ + return body.decode('utf-8', errors='replace') + + def _build_log_text( + self, + payload: Any, + mode: Literal['full', 'none', 'summary', 'include', 'exclude'], + include_fields: tuple[str, ...], + exclude_fields: tuple[str, ...], + payload_kind: Literal['request', 'response'], + ) -> str: + """ + 根据日志策略构建日志文本 + + :param payload: 已完成脱敏的日志载荷 + :param mode: 日志记录模式 + :param include_fields: 白名单字段路径 + :param exclude_fields: 排除字段路径 + :param payload_kind: 载荷类型 + :return: 日志文本 + """ + if mode == 'none' or not payload: + return '' + if mode == 'summary': + log_payload = self._build_summary_payload(payload, payload_kind) + elif mode == 'include': + log_payload = self._extract_include_fields(payload, include_fields, payload_kind) + elif mode == 'exclude': + log_payload = self._exclude_fields(payload, exclude_fields, payload_kind) + else: + log_payload = payload + return json.dumps(log_payload, ensure_ascii=False, indent=2) if log_payload else '' + + def _build_summary_payload(self, payload: Any, payload_kind: Literal['request', 'response']) -> dict[str, Any]: + """ + 构建摘要日志载荷 + + :param payload: 原始载荷 + :param payload_kind: 载荷类型 + :return: 摘要日志载荷 + """ + summary_payload: dict[str, Any] = { + 'mode': 'summary', + 'kind': payload_kind, + } + if not isinstance(payload, dict): + summary_payload['type'] = type(payload).__name__ + return summary_payload + summary_payload['keys'] = list(payload.keys()) + if payload_kind == 'request': + summary_payload.update( + { + 'path_param_keys': self._get_mapping_keys(payload.get('path_params')), + 'query_param_keys': self._get_mapping_keys(payload.get('query_params')), + 'json_body_keys': self._get_mapping_keys(payload.get('json_body')), + 'form_data_keys': self._get_mapping_keys(payload.get('form_data')), + 'file_fields': self._get_mapping_keys(payload.get('files')), + 'raw_body_length': len(payload.get('raw_body', '')) if payload.get('raw_body') else 0, + } + ) + else: + summary_payload.update( + { + 'code': payload.get('code'), + 'msg': self._get_result_message(payload), + 'data_keys': self._get_mapping_keys(payload.get('data')), + 'rows_count': len(payload.get('rows')) if isinstance(payload.get('rows'), list) else 0, + } + ) + return summary_payload + + def _extract_include_fields( + self, + payload: Any, + include_fields: tuple[str, ...], + payload_kind: Literal['request', 'response'], + ) -> dict[str, Any]: + """ + 提取白名单字段日志载荷 + + :param payload: 原始载荷 + :param include_fields: 白名单字段路径 + :param payload_kind: 载荷类型 + :return: 提取后的日志载荷 + """ + selected_fields = {} + for field_path in include_fields: + field_value = self._get_field_value_by_path(payload, field_path) + if field_value is not self._MISSING: + selected_fields[field_path] = field_value + else: + self._warn_missing_field_path(payload, field_path, payload_kind, strategy='include') + return { + 'mode': 'include', + 'selected': selected_fields, + } + + def _exclude_fields( + self, + payload: Any, + exclude_fields: tuple[str, ...], + payload_kind: Literal['request', 'response'], + ) -> Any: + """ + 排除指定字段后返回日志载荷 + + :param payload: 原始载荷 + :param exclude_fields: 排除字段路径 + :param payload_kind: 载荷类型 + :return: 排除后的日志载荷 + """ + if not exclude_fields or not isinstance(payload, (dict, list)): + return payload + filtered_payload = deepcopy(payload) + for field_path in self._sort_field_paths_for_exclude(exclude_fields): + if not self._remove_field_by_path(filtered_payload, field_path): + self._warn_missing_field_path(payload, field_path, payload_kind, strategy='exclude') + return filtered_payload + + def _warn_missing_field_path( + self, + payload: Any, + field_path: str, + payload_kind: Literal['request', 'response'], + strategy: Literal['include', 'exclude'], + ) -> None: + """ + 记录未命中的字段路径告警,同一路径仅提示一次 + + :param payload: 原始载荷 + :param field_path: 字段路径 + :param payload_kind: 载荷类型 + :param strategy: 当前字段路径策略 + :return: None + """ + warning_key = f'{strategy}:{payload_kind}:{field_path}' + if warning_key in self._warned_field_path_warnings: + return + self._warned_field_path_warnings.add(warning_key) + kind_text = '请求' if payload_kind == 'request' else '响应' + reason = self._describe_missing_field_path(payload, field_path) + if strategy == 'include': + logger.warning( + f'Log装饰器字段白名单未命中:{kind_text}日志字段路径`{field_path}`已忽略。' + f'{reason}若该字段为可选字段,可忽略此提示。' + ) + else: + logger.warning( + f'Log装饰器字段排除路径未命中:{kind_text}日志字段路径`{field_path}`未生效。' + f'{reason}若该字段为可选字段,可忽略此提示。' + ) + + def _describe_missing_field_path(self, payload: Any, field_path: str) -> str: + """ + 描述字段路径未命中的原因 + + :param payload: 原始载荷 + :param field_path: 字段路径 + :return: 原因描述 + """ + current_value = payload + traversed_parts: list[str] = [] + for part in field_path.split('.'): + traversed_path = '.'.join(traversed_parts) or '' + if isinstance(current_value, dict): + mapping_value = self._get_mapping_value_by_part(current_value, part) + if mapping_value is self._MISSING: + available_keys = ', '.join(map(str, current_value.keys())) if current_value else '无' + return f'在`{traversed_path}`下未找到字段`{part}`,可用字段:{available_keys}。' + if mapping_value is self._AMBIGUOUS: + ambiguous_keys = ', '.join( + str(key) + for key in current_value + if self._normalize_include_key(str(key)) == self._normalize_include_key(part) + ) + return ( + f'在`{traversed_path}`下字段`{part}`存在命名冲突,' + f'可匹配字段:{ambiguous_keys};请改用精确字段名。' + ) + current_value = mapping_value + elif isinstance(current_value, list): + if not part.isdigit(): + return f'在`{traversed_path}`处当前值为列表,字段片段`{part}`应为数字下标。' + current_index = int(part) + if current_index >= len(current_value): + return f'在`{traversed_path}`处列表长度为{len(current_value)},下标`{part}`越界。' + current_value = current_value[current_index] + else: + current_type = type(current_value).__name__ if current_value is not None else 'None' + return f'在`{traversed_path}`处当前值类型为`{current_type}`,无法继续匹配后续路径。' + traversed_parts.append(part) + return '' + + def _warn_invalid_field_path_config(self) -> None: + """ + 记录字段路径配置告警,帮助开发者发现路径误配 + + :return: None + """ + warnings = [ + *self._collect_field_path_warnings( + mode=self.request_log_mode, + include_fields=self.request_include_fields, + exclude_fields=self.request_exclude_fields, + payload_kind='request', + ), + *self._collect_field_path_warnings( + mode=self.response_log_mode, + include_fields=self.response_include_fields, + exclude_fields=self.response_exclude_fields, + payload_kind='response', + ), + ] + for warning in warnings: + logger.warning(f'Log装饰器字段路径配置提示:{warning}') + + def _validate_request_field_paths_strict(self) -> None: + """ + 严格校验请求日志字段路径根节点 - return json.dumps(params, ensure_ascii=False, indent=2) if params else '' + :return: None + """ + field_paths = () + if self.request_log_mode == 'include': + field_paths = self.request_include_fields + elif self.request_log_mode == 'exclude': + field_paths = self.request_exclude_fields + for field_path in field_paths: + if not field_path: + continue + root_part = field_path.split('.', 1)[0] + if self._resolve_include_root(root_part, self._REQUEST_INCLUDE_ROOTS) is None: + raise ValueError( + f'请求日志字段路径`{field_path}`使用了不支持的根节点`{root_part}`;' + f'仅支持:{", ".join(self._REQUEST_INCLUDE_ROOTS)}' + ) + + def _collect_field_path_warnings( + self, + mode: Literal['full', 'none', 'summary', 'include', 'exclude'], + include_fields: tuple[str, ...], + exclude_fields: tuple[str, ...], + payload_kind: Literal['request', 'response'], + ) -> list[str]: + """ + 收集字段路径配置告警信息 + + :param mode: 日志记录模式 + :param include_fields: 白名单字段路径列表 + :param exclude_fields: 排除字段路径列表 + :param payload_kind: 载荷类型 + :return: 告警信息列表 + """ + warnings = [] + kind_text = '请求' if payload_kind == 'request' else '响应' + recommended_roots = self._REQUEST_INCLUDE_ROOTS if payload_kind == 'request' else self._RESPONSE_INCLUDE_ROOTS + if mode == 'include' and not include_fields: + warnings.append( + f'{kind_text}日志已启用include模式,但未配置白名单字段;推荐根节点:{", ".join(recommended_roots)}' + ) + if mode == 'exclude' and not exclude_fields: + warnings.append( + f'{kind_text}日志已启用exclude模式,但未配置排除字段;推荐根节点:{", ".join(recommended_roots)}' + ) + if mode != 'include' and include_fields: + warnings.append(f'{kind_text}日志配置了白名单字段,但当前模式为{mode},这些字段不会生效') + if mode != 'exclude' and exclude_fields: + warnings.append(f'{kind_text}日志配置了排除字段,但当前模式为{mode},这些字段不会生效') + for field_path in include_fields: + path_warning = self._validate_field_path(field_path, payload_kind, strategy='include') + if path_warning: + warnings.append(path_warning) + for field_path in exclude_fields: + path_warning = self._validate_field_path(field_path, payload_kind, strategy='exclude') + if path_warning: + warnings.append(path_warning) + + return warnings + + def _validate_field_path( + self, + field_path: str, + payload_kind: Literal['request', 'response'], + strategy: Literal['include', 'exclude'], + ) -> str | None: + """ + 校验单个字段路径 + + :param field_path: 字段路径 + :param payload_kind: 载荷类型 + :param strategy: 当前字段路径策略 + :return: 告警信息 + """ + kind_text = '请求' if payload_kind == 'request' else '响应' + recommended_roots = self._REQUEST_INCLUDE_ROOTS if payload_kind == 'request' else self._RESPONSE_INCLUDE_ROOTS + strategy_text = '白名单字段路径' if strategy == 'include' else '排除字段路径' + if not field_path: + return f'{kind_text}日志存在空{strategy_text};推荐根节点:{", ".join(recommended_roots)}' + parts = field_path.split('.') + if any(not part for part in parts): + return f'{kind_text}日志{strategy_text}`{field_path}`格式不合法,请使用`.`分隔的完整路径' + canonical_root = self._resolve_include_root(parts[0], recommended_roots) + if canonical_root is None: + return ( + f'{kind_text}日志{strategy_text}`{field_path}`未使用推荐根节点`{parts[0]}`;' + f'推荐根节点:{", ".join(recommended_roots)}' + ) + return None + + def _get_field_value_by_path(self, payload: Any, field_path: str) -> Any: + """ + 通过字段路径获取字段值 + + :param payload: 原始载荷 + :param field_path: 字段路径 + :return: 字段值 + """ + current_value = payload + for part in field_path.split('.'): + if isinstance(current_value, dict): + mapping_value = self._get_mapping_value_by_part(current_value, part) + if mapping_value is self._MISSING or mapping_value is self._AMBIGUOUS: + return self._MISSING + current_value = mapping_value + elif isinstance(current_value, list) and part.isdigit(): + current_index = int(part) + if current_index >= len(current_value): + return self._MISSING + current_value = current_value[current_index] + else: + return self._MISSING + return current_value + + def _get_mapping_value_by_part(self, payload: dict[str, Any], part: str) -> Any: + """ + 从字典中按字段片段获取值,支持 snake_case / camelCase / kebab-case 自动兼容 + + :param payload: 当前字典载荷 + :param part: 当前路径片段 + :return: 字段值 + """ + if part in payload: + return payload[part] + normalized_part = self._normalize_include_key(part) + matched_keys = [key for key in payload if self._normalize_include_key(str(key)) == normalized_part] + if len(matched_keys) == 1: + return payload[matched_keys[0]] + if len(matched_keys) > 1: + return self._AMBIGUOUS + return self._MISSING + + @staticmethod + def _sort_field_paths_for_exclude(field_paths: tuple[str, ...]) -> list[str]: + """ + 对 exclude 字段路径排序,优先处理更深层路径和更大的列表下标,避免列表删除时发生索引位移 + + :param field_paths: 原始字段路径 + :return: 排序后的字段路径列表 + """ + return sorted(field_paths, key=Log._build_exclude_sort_key, reverse=True) + + @staticmethod + def _build_exclude_sort_key(field_path: str) -> tuple[int, tuple[tuple[int, int | str], ...]]: + """ + 构建 exclude 字段路径排序键 + + :param field_path: 字段路径 + :return: 排序键 + """ + parts = field_path.split('.') + normalized_parts: tuple[tuple[int, int | str], ...] = tuple( + (1, int(part)) if part.isdigit() else (0, part) for part in parts + ) + return len(parts), normalized_parts + + def _remove_field_by_path(self, payload: Any, field_path: str) -> bool: + """ + 按路径移除字段 + + :param payload: 原始载荷 + :param field_path: 字段路径 + :return: 是否移除成功 + """ + if not field_path: + return False + current_value = payload + parts = field_path.split('.') + for part in parts[:-1]: + if isinstance(current_value, dict): + mapping_value = self._get_mapping_value_by_part(current_value, part) + if mapping_value is self._MISSING or mapping_value is self._AMBIGUOUS: + return False + current_value = mapping_value + elif isinstance(current_value, list) and part.isdigit(): + current_index = int(part) + if current_index >= len(current_value): + return False + current_value = current_value[current_index] + else: + return False + + target_part = parts[-1] + if isinstance(current_value, dict): + resolved_key = self._resolve_mapping_key_by_part(current_value, target_part) + if resolved_key is self._MISSING or resolved_key is self._AMBIGUOUS: + return False + del current_value[resolved_key] + return True + if isinstance(current_value, list) and target_part.isdigit(): + current_index = int(target_part) + if current_index >= len(current_value): + return False + current_value.pop(current_index) + return True + return False + + def _resolve_mapping_key_by_part(self, payload: dict[str, Any], part: str) -> str | object: + """ + 解析字段片段在字典中的真实键名 + + :param payload: 当前字典载荷 + :param part: 当前路径片段 + :return: 真实键名或哨兵值 + """ + if part in payload: + return part + normalized_part = self._normalize_include_key(part) + matched_keys = [key for key in payload if self._normalize_include_key(str(key)) == normalized_part] + if len(matched_keys) == 1: + return matched_keys[0] + if len(matched_keys) > 1: + return self._AMBIGUOUS + return self._MISSING + + @staticmethod + def _resolve_include_root( + root: RequestLogFieldRoot | ResponseLogFieldRoot | str, candidates: tuple[str, ...] + ) -> str | None: + """ + 解析 include 根节点到标准写法 + + :param root: 原始根节点 + :param candidates: 候选根节点 + :return: 标准根节点 + """ + normalized_root = Log._normalize_include_key(str(root)) + for candidate in candidates: + if Log._normalize_include_key(candidate) == normalized_root: + return candidate + return None + + @staticmethod + def _normalize_include_key(field_name: str) -> str: + """ + 标准化 include 字段名 + + :param field_name: 原始字段名 + :return: 标准化后的字段名 + """ + return ''.join(char.lower() for char in field_name if char.isalnum()) + + @staticmethod + def _get_mapping_keys(payload: Any) -> list[str]: + """ + 获取字典载荷的键列表 + + :param payload: 原始载荷 + :return: 键列表 + """ + if isinstance(payload, dict): + return list(payload.keys()) + return [] def _get_login_log( self, user_agent: Any, oper_ip: str, oper_location: str, oper_time: datetime, origin_kwargs: dict @@ -262,7 +903,7 @@ def _get_login_log( """ login_log = {} if self.log_type == 'login': - user_agent_info = parse(user_agent) + user_agent_info = parse(user_agent or '') browser = f'{user_agent_info.browser.family}' system_os = f'{user_agent_info.os.family}' if user_agent_info.browser.version != (): @@ -303,10 +944,20 @@ def _get_status_and_error_msg(self, result_dict: dict) -> tuple[int, str]: if result_dict.get('code') == HTTP_200_OK: status = 0 else: - error_msg = result_dict.get('msg') + error_msg = self._get_result_message(result_dict) return status, error_msg + @staticmethod + def _get_result_message(result_dict: dict[str, Any]) -> Any: + """ + 获取响应结果中的消息字段,兼容 msg / message 两种写法 + + :param result_dict: 操作结果字典 + :return: 消息内容 + """ + return result_dict.get('msg') if result_dict.get('msg') is not None else result_dict.get('message') + def _is_request_from_swagger_or_redoc(self, request: Request) -> tuple[bool, bool]: """ 判断请求是否来自swagger或redoc diff --git a/ruoyi-fastapi-backend/config/env.py b/ruoyi-fastapi-backend/config/env.py index b631520..3dd6604 100644 --- a/ruoyi-fastapi-backend/config/env.py +++ b/ruoyi-fastapi-backend/config/env.py @@ -84,6 +84,15 @@ class LogSettings(BaseSettings): 日志与队列配置 """ + log_mask_enabled: bool = True + log_mask_placeholder: str = '******' + log_mask_fields: str = ( + 'password,old_password,new_password,confirm_password,api_key,token,access_token,refresh_token,' + 'authorization,client_secret,secret,secret_key,private_key,private_key_pem,credential,credentials,' + 'sms_code,captcha_code,system_prompt' + ) + log_partial_mask_fields: str = 'phonenumber,phone,mobile,email' + log_config_secret_patterns: str = 'password,token,secret,key,private,credential,access,jwt,captcha,sms' log_stream_key: str = 'log:stream' log_stream_group: str = 'log_aggregator' log_stream_consumer_prefix: str = 'worker' @@ -291,7 +300,7 @@ def parse_cli_args() -> None: parser = argparse.ArgumentParser(description='命令行参数') parser.add_argument('--env', type=str, default='', help='运行环境') # 解析命令行参数 - args = parser.parse_args() + args, _ = parser.parse_known_args() # 设置环境变量,如果未设置命令行参数,默认APP_ENV为dev os.environ['APP_ENV'] = args.env if args.env else 'dev' # 读取运行环境 diff --git a/ruoyi-fastapi-backend/module_admin/service/log_service.py b/ruoyi-fastapi-backend/module_admin/service/log_service.py index 4ffbb23..3560f39 100644 --- a/ruoyi-fastapi-backend/module_admin/service/log_service.py +++ b/ruoyi-fastapi-backend/module_admin/service/log_service.py @@ -26,7 +26,7 @@ ) from module_admin.service.dict_service import DictDataService from utils.excel_util import ExcelUtil -from utils.log_util import logger +from utils.log_util import LogSanitizer, logger class OperationLogService: @@ -332,7 +332,7 @@ async def enqueue_login_log(cls, request: Request, login_log: LogininforModel, s :param source: 日志来源 :return: None """ - payload = login_log.model_dump(by_alias=True, exclude_none=True) + payload = LogSanitizer.sanitize_data(login_log.model_dump(by_alias=True, exclude_none=True)) await cls._xadd_event(request.app.state.redis, 'login', payload, source) @classmethod @@ -345,7 +345,7 @@ async def enqueue_operation_log(cls, request: Request, operation_log: OperLogMod :param source: 日志来源 :return: None """ - payload = operation_log.model_dump(by_alias=True, exclude_none=True) + payload = LogSanitizer.sanitize_data(operation_log.model_dump(by_alias=True, exclude_none=True)) await cls._xadd_event(request.app.state.redis, 'operation', payload, source) diff --git a/ruoyi-fastapi-backend/module_ai/controller/ai_model_controller.py b/ruoyi-fastapi-backend/module_ai/controller/ai_model_controller.py index 839d09c..60292ab 100644 --- a/ruoyi-fastapi-backend/module_ai/controller/ai_model_controller.py +++ b/ruoyi-fastapi-backend/module_ai/controller/ai_model_controller.py @@ -7,7 +7,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from common.annotation.cache_annotation import ApiCache, ApiCacheEvict -from common.annotation.log_annotation import Log +from common.annotation.log_annotation import Log, RequestLogFieldRoot from common.aspect.data_scope import DataScopeDependency from common.aspect.db_seesion import DBSessionDependency from common.aspect.interface_auth import UserInterfaceAuthDependency @@ -82,7 +82,12 @@ async def get_ai_model_all( ) @ValidateFields(validate_model='add_ai_model') @ApiCacheEvict(namespaces=ApiGroup.AI_MODEL_MUTATION) -@Log(title='AI模型管理', business_type=BusinessType.INSERT) +@Log( + title='AI模型管理', + business_type=BusinessType.INSERT, + request_log_mode='exclude', + request_exclude_fields=(RequestLogFieldRoot.JSON_BODY.field('api_key'),), +) async def add_ai_model( request: Request, add_ai_model: AiModelModel, @@ -110,7 +115,12 @@ async def add_ai_model( ) @ValidateFields(validate_model='edit_ai_model') @ApiCacheEvict(namespaces=ApiGroup.AI_MODEL_MUTATION) -@Log(title='AI模型管理', business_type=BusinessType.UPDATE) +@Log( + title='AI模型管理', + business_type=BusinessType.UPDATE, + request_log_mode='exclude', + request_exclude_fields=(RequestLogFieldRoot.JSON_BODY.field('api_key'),), +) async def edit_ai_model( request: Request, edit_ai_model: AiModelModel, diff --git a/ruoyi-fastapi-backend/tests/test_log_sanitize_util.py b/ruoyi-fastapi-backend/tests/test_log_sanitize_util.py new file mode 100644 index 0000000..d09c45f --- /dev/null +++ b/ruoyi-fastapi-backend/tests/test_log_sanitize_util.py @@ -0,0 +1,772 @@ +import asyncio +import json +import os +import sys +from datetime import datetime +from types import SimpleNamespace +from unittest.mock import patch + +from fastapi import Request +from loguru import logger as _logger + +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) + +from common.annotation.log_annotation import Log, RequestLogFieldRoot, ResponseLogFieldRoot +from common.enums import BusinessType +from config.env import LogConfig +from utils.log_util import LoggerInitializer, LogSanitizer, _build_text_assignment_patterns, _build_text_key_pattern + + +def test_sanitize_nested_log_payload() -> None: + payload = { + 'jsonBody': { + 'password': 'plain-password', + 'apiKey': 'sk-123456', + 'systemPrompt': 'top secret prompt', + 'phonenumber': '13812345678', + 'email': 'admin@example.com', + 'ipaddr': '192.168.1.10', + } + } + + sanitized = LogSanitizer.sanitize_data(payload) + + assert sanitized['jsonBody']['password'] == '******' + assert sanitized['jsonBody']['apiKey'] == '******' + assert sanitized['jsonBody']['systemPrompt'] == '******' + assert sanitized['jsonBody']['phonenumber'] == '138****5678' + assert sanitized['jsonBody']['email'] == 'a***n@example.com' + assert sanitized['jsonBody']['ipaddr'] == '192.168.1.10' + + +def test_ip_masking_can_be_enabled_by_partial_mask_field_configuration() -> None: + payload = { + 'jsonBody': { + 'ipaddr': '192.168.1.10', + 'login_ip': '192.168.1.11', + } + } + + with patch.object( + LogSanitizer, + '_PARTIAL_MASK_FIELDS', + LogSanitizer._PARTIAL_MASK_FIELDS | {'ip', 'ipaddr', 'operip', 'loginip'}, + ): + sanitized = LogSanitizer.sanitize_data(payload) + + assert sanitized['jsonBody']['ipaddr'] == '192.168.1.*' + assert sanitized['jsonBody']['login_ip'] == '192.168.1.*' + + +def test_sanitize_config_value_by_config_key() -> None: + payload = { + 'configKey': 'sys.transport.privateKey', + 'configValue': '-----BEGIN PRIVATE KEY-----abc', + } + + sanitized = LogSanitizer.sanitize_data(payload) + + assert sanitized['configValue'] == '******' + + +def test_sanitize_text_message() -> None: + text = 'Authorization=Bearer abc.def password=123456 api_key=sk-test' + expected_mask_count = 3 + + sanitized = LogSanitizer.sanitize_text(text) + + assert 'abc.def' not in sanitized + assert '123456' not in sanitized + assert 'sk-test' not in sanitized + assert sanitized.count('******') >= expected_mask_count + + +def test_sanitize_text_masks_configured_secret_fields() -> None: + text = ( + 'secret_key=my-secret private_key="-----BEGIN PRIVATE KEY-----abc" ' + "credential='cred-123' credentials=cred-list" + ) + expected_mask_count = 4 + + sanitized = LogSanitizer.sanitize_text(text) + + assert 'my-secret' not in sanitized + assert '-----BEGIN PRIVATE KEY-----abc' not in sanitized + assert 'cred-123' not in sanitized + assert 'cred-list' not in sanitized + assert sanitized.count('******') >= expected_mask_count + + +def test_sanitize_stringified_json_preserves_newlines_after_masking() -> None: + text = '{\n "password": "123456",\n "userName": "admin"\n}' + + sanitized = LogSanitizer.sanitize_data({'operParam': text}) + + assert sanitized['operParam'] == '{\n "password": "******",\n "userName": "admin"\n}' + + +def test_sanitize_text_masks_configured_partial_fields() -> None: + text = 'email=admin@example.com phonenumber=13812345678 mobile="13812345679"' + + sanitized = LogSanitizer.sanitize_text(text) + + assert 'admin@example.com' not in sanitized + assert '13812345678' not in sanitized + assert '13812345679' not in sanitized + assert 'a***n@example.com' in sanitized + assert '138****5678' in sanitized + assert '138****5679' in sanitized + + +def test_sanitize_text_can_enable_ip_masking_by_partial_mask_field_configuration() -> None: + text = 'login_ip=192.168.1.10 ipaddr="192.168.1.11"' + ip_fields = LogSanitizer._PARTIAL_MASK_FIELDS | {'ip', 'ipaddr', 'operip', 'loginip'} + ip_key_pattern = '|'.join( + sorted( + { + _build_text_key_pattern(field_name) + for field_name in (*LogSanitizer._TEXT_PARTIAL_FIELDS, 'ip', 'ipaddr', 'oper_ip', 'login_ip') + }, + key=len, + reverse=True, + ) + ) + + with ( + patch.object(LogSanitizer, '_PARTIAL_MASK_FIELDS', ip_fields), + patch.object(LogSanitizer, '_PARTIAL_KV_PATTERNS', _build_text_assignment_patterns(ip_key_pattern)), + ): + sanitized = LogSanitizer.sanitize_text(text) + + assert '192.168.1.10' not in sanitized + assert '192.168.1.11' not in sanitized + assert '192.168.1.*' in sanitized + + +def test_sanitize_text_masks_python_repr_like_payload() -> None: + text = "ValueError({'secret_key': 'abc', 'password': '123456', 'api_key': 'sk-123'})" + expected_mask_count = 3 + + sanitized = LogSanitizer.sanitize_text(text) + + assert 'abc' not in sanitized + assert '123456' not in sanitized + assert 'sk-123' not in sanitized + assert sanitized.count('******') >= expected_mask_count + + +def test_sanitize_text_masks_alphanumeric_verification_code() -> None: + text = '短信验证码为A1B2' + + sanitized = LogSanitizer.sanitize_text(text) + + assert 'A1B2' not in sanitized + assert '******' in sanitized + + +def test_sanitize_text_returns_original_when_mask_disabled() -> None: + text = 'Authorization=Bearer abc.def password=123456 api_key=sk-test' + + with patch.object(LogConfig, 'log_mask_enabled', False): + sanitized = LogSanitizer.sanitize_text(text) + + assert sanitized == text + + +def test_build_json_payload_sanitizes_exception_and_extra() -> None: + initializer = LoggerInitializer() + try: + raise ValueError('token=abc.def') + except ValueError as exc: + exception = SimpleNamespace( + type=type(exc), + value=exc, + traceback=exc.__traceback__, + ) + record = { + 'time': datetime(2026, 4, 20, 12, 0, 0), + 'level': SimpleNamespace(name='ERROR'), + 'message': 'password=123456', + 'name': 'test_logger', + 'module': 'test_module', + 'function': 'test_function', + 'line': 1, + 'extra': {'authorization': 'Bearer abc.def', 'trace_id': 'trace-1'}, + 'exception': exception, + } + + initializer._patch_record(record) + initializer._filter(record) + payload = initializer._build_json_payload(record) + + assert '123456' not in payload['message'] + assert 'abc.def' not in payload['exception']['value'] + assert 'abc.def' not in payload['exception']['traceback'] + assert 'ValueError' in payload['exception']['traceback'] + assert payload['extra']['authorization'] == '******' + + +def test_plain_log_formatter_sanitizes_exception_traceback() -> None: + initializer = LoggerInitializer() + outputs = [] + test_logger = _logger.patch(initializer._patch_record) + test_logger.remove() + test_logger.add( + lambda message: outputs.append(str(message)), + format=initializer._plain_log_formatter, + filter=initializer._filter, + backtrace=False, + diagnose=False, + ) + + try: + raise ValueError('password=123456 token=abc.def') + except ValueError: + test_logger.exception('boom password=123456') + + output = outputs[0] + + assert '123456' not in output + assert 'abc.def' not in output + assert 'ValueError' in output + assert '******' in output + + +def test_get_request_params_returns_structured_payload() -> None: + async def receive() -> dict: + return { + 'type': 'http.request', + 'body': b'{"password":"plain-password","phonenumber":"13812345678"}', + 'more_body': False, + } + + request = Request( + { + 'type': 'http', + 'method': 'POST', + 'path': '/test', + 'headers': [(b'content-type', b'application/json')], + 'path_params': {}, + 'query_string': b'page=1', + }, + receive=receive, + ) + + log_decorator = Log(title='测试日志', business_type=BusinessType.OTHER) + params = asyncio.run(log_decorator._get_request_params(request)) + + assert params['query_params']['page'] == '1' + assert params['json_body']['password'] == 'plain-password' + + +def test_get_request_params_falls_back_to_raw_body_when_json_invalid() -> None: + async def receive() -> dict: + return { + 'type': 'http.request', + 'body': b'{bad json', + 'more_body': False, + } + + request = Request( + { + 'type': 'http', + 'method': 'POST', + 'path': '/test', + 'headers': [(b'content-type', b'application/json')], + 'path_params': {}, + 'query_string': b'', + }, + receive=receive, + ) + + log_decorator = Log(title='测试日志', business_type=BusinessType.OTHER) + + with patch('common.annotation.log_annotation.logger.warning') as mock_warning: + params = asyncio.run(log_decorator._get_request_params(request)) + + assert params['raw_body'] == '{bad json' + mock_warning.assert_called_once() + assert '解析失败' in mock_warning.call_args.args[0] + + +def test_get_request_params_decodes_non_utf8_body_without_raising() -> None: + async def receive() -> dict: + return { + 'type': 'http.request', + 'body': b'\xff\xfeabc', + 'more_body': False, + } + + request = Request( + { + 'type': 'http', + 'method': 'POST', + 'path': '/test', + 'headers': [(b'content-type', b'text/plain')], + 'path_params': {}, + 'query_string': b'', + }, + receive=receive, + ) + + log_decorator = Log(title='测试日志', business_type=BusinessType.OTHER) + params = asyncio.run(log_decorator._get_request_params(request)) + + assert params['raw_body'].endswith('abc') + assert '\ufffd' in params['raw_body'] + + +def test_log_handles_missing_user_agent_without_error() -> None: + log_decorator = Log(title='测试日志', business_type=BusinessType.OTHER, log_type='login') + + assert log_decorator._get_oper_type(None) == 0 + + login_log = log_decorator._get_login_log( + None, + oper_ip='127.0.0.1', + oper_location='内网IP', + oper_time=datetime(2026, 4, 20, 12, 0, 0), + origin_kwargs={}, + ) + + assert login_log['ipaddr'] == '127.0.0.1' + assert login_log['loginLocation'] == '内网IP' + assert login_log['browser'] == 'Other' + assert login_log['os'] == 'Other' + + +def test_build_log_text_with_summary_mode() -> None: + log_decorator = Log( + title='测试日志', + business_type=BusinessType.OTHER, + request_log_mode='summary', + ) + + log_text = log_decorator._build_log_text( + LogSanitizer.sanitize_data( + { + 'query_params': {'page': '1'}, + 'json_body': {'password': 'plain-password', 'modelName': 'demo-model'}, + } + ), + mode='summary', + include_fields=(), + exclude_fields=(), + payload_kind='request', + ) + log_payload = json.loads(log_text) + + assert log_payload['mode'] == 'summary' + assert log_payload['query_param_keys'] == ['page'] + assert log_payload['json_body_keys'] == ['password', 'modelName'] + + +def test_build_log_text_with_include_mode() -> None: + log_decorator = Log( + title='测试日志', + business_type=BusinessType.OTHER, + response_log_mode='include', + response_include_fields=('data.token', 'data.userName'), + ) + + log_text = log_decorator._build_log_text( + LogSanitizer.sanitize_data( + { + 'code': 200, + 'msg': 'success', + 'data': {'token': 'abc.def', 'userName': 'admin', 'extra': 'ignored'}, + } + ), + mode='include', + include_fields=('data.token', 'data.userName'), + exclude_fields=(), + payload_kind='response', + ) + log_payload = json.loads(log_text) + + assert log_payload['mode'] == 'include' + assert log_payload['selected']['data.token'] == '******' + assert log_payload['selected']['data.userName'] == 'admin' + assert 'data.extra' not in log_payload['selected'] + + +def test_include_fields_support_snake_case_matching_camel_case_payload() -> None: + log_decorator = Log(title='测试日志', business_type=BusinessType.OTHER) + + log_text = log_decorator._build_log_text( + LogSanitizer.sanitize_data( + { + 'json_body': { + 'modelCode': 'deepseek-chat', + 'supportReasoning': 'Y', + } + } + ), + mode='include', + include_fields=('json_body.model_code', 'json_body.support_reasoning'), + exclude_fields=(), + payload_kind='request', + ) + log_payload = json.loads(log_text) + + assert log_payload['selected']['json_body.model_code'] == 'deepseek-chat' + assert log_payload['selected']['json_body.support_reasoning'] == 'Y' + + +def test_include_fields_support_camel_case_matching_snake_case_payload() -> None: + log_decorator = Log(title='测试日志', business_type=BusinessType.OTHER) + + log_text = log_decorator._build_log_text( + LogSanitizer.sanitize_data( + { + 'data': { + 'user_name': 'admin', + 'login_ip': '192.168.1.10', + } + } + ), + mode='include', + include_fields=('data.userName', 'data.loginIp'), + exclude_fields=(), + payload_kind='response', + ) + log_payload = json.loads(log_text) + + assert log_payload['selected']['data.userName'] == 'admin' + assert log_payload['selected']['data.loginIp'] == '192.168.1.10' + + +def test_request_field_helper_builds_canonical_path() -> None: + assert RequestLogFieldRoot.JSON_BODY.field('model_code') == 'json_body.model_code' + assert ResponseLogFieldRoot.DATA.field('userName') == 'data.userName' + + +def test_code_field_masking_only_applies_to_login_like_verification_code() -> None: + login_payload = { + 'uuid': 'captcha-session', + 'code': 'A1B2', + 'msg': 'login', + } + business_payload = { + 'code': '200', + 'msg': 'ok', + } + + sanitized_login_payload = LogSanitizer.sanitize_data(login_payload) + sanitized_business_payload = LogSanitizer.sanitize_data(business_payload) + + assert sanitized_login_payload['code'] == '******' + assert sanitized_business_payload['code'] == '200' + + +def test_build_log_text_with_exclude_mode() -> None: + log_decorator = Log( + title='测试日志', + business_type=BusinessType.OTHER, + request_log_mode='exclude', + request_exclude_fields=('json_body.api_key',), + ) + + log_text = log_decorator._build_log_text( + LogSanitizer.sanitize_data( + { + 'json_body': { + 'modelCode': 'deepseek-chat', + 'apiKey': 'sk-test', + 'baseUrl': 'https://api.example.com', + } + } + ), + mode='exclude', + include_fields=(), + exclude_fields=('json_body.api_key',), + payload_kind='request', + ) + log_payload = json.loads(log_text) + + assert 'apiKey' not in log_payload['json_body'] + assert log_payload['json_body']['modelCode'] == 'deepseek-chat' + assert log_payload['json_body']['baseUrl'] == 'https://api.example.com' + + +def test_exclude_fields_support_snake_case_matching_camel_case_payload() -> None: + log_decorator = Log(title='测试日志', business_type=BusinessType.OTHER) + + log_text = log_decorator._build_log_text( + LogSanitizer.sanitize_data( + { + 'data': { + 'apiKey': '******', + 'baseUrl': 'https://api.example.com', + } + } + ), + mode='exclude', + include_fields=(), + exclude_fields=('data.api_key',), + payload_kind='response', + ) + log_payload = json.loads(log_text) + + assert 'apiKey' not in log_payload['data'] + assert log_payload['data']['baseUrl'] == 'https://api.example.com' + + +def test_exclude_fields_remove_multiple_list_indexes_without_shift() -> None: + log_decorator = Log(title='测试日志', business_type=BusinessType.OTHER) + + log_text = log_decorator._build_log_text( + LogSanitizer.sanitize_data( + { + 'rows': [ + {'id': 1}, + {'id': 2}, + {'id': 3}, + ] + } + ), + mode='exclude', + include_fields=(), + exclude_fields=('rows.0', 'rows.1'), + payload_kind='response', + ) + log_payload = json.loads(log_text) + + assert log_payload['rows'] == [{'id': 3}] + + +def test_collect_field_path_warnings_for_invalid_root() -> None: + log_decorator = Log(title='测试日志', business_type=BusinessType.OTHER) + + warnings = log_decorator._collect_field_path_warnings( + mode='include', + include_fields=('body.modelCode',), + exclude_fields=(), + payload_kind='request', + ) + + assert len(warnings) == 1 + assert 'body.modelCode' in warnings[0] + assert 'json_body' in warnings[0] + + +def test_collect_field_path_warnings_accepts_normalized_root() -> None: + log_decorator = Log(title='测试日志', business_type=BusinessType.OTHER) + + warnings = log_decorator._collect_field_path_warnings( + mode='include', + include_fields=('jsonBody.modelCode',), + exclude_fields=(), + payload_kind='request', + ) + + assert warnings == [] + + +def test_collect_field_path_warnings_for_missing_include_fields() -> None: + log_decorator = Log(title='测试日志', business_type=BusinessType.OTHER) + + warnings = log_decorator._collect_field_path_warnings( + mode='include', + include_fields=(), + exclude_fields=(), + payload_kind='response', + ) + + assert len(warnings) == 1 + assert 'include模式' in warnings[0] + assert 'data' in warnings[0] + + +def test_collect_field_path_warnings_for_missing_exclude_fields() -> None: + log_decorator = Log(title='测试日志', business_type=BusinessType.OTHER) + + warnings = log_decorator._collect_field_path_warnings( + mode='exclude', + include_fields=(), + exclude_fields=(), + payload_kind='request', + ) + + assert len(warnings) == 1 + assert 'exclude模式' in warnings[0] + assert 'json_body' in warnings[0] + + +def test_log_warns_when_include_fields_do_not_match_mode() -> None: + with patch('common.annotation.log_annotation.logger.warning') as mock_warning: + Log( + title='测试日志', + business_type=BusinessType.OTHER, + request_log_mode='summary', + request_include_fields=('json_body.modelCode',), + ) + + mock_warning.assert_called_once() + assert '不会生效' in mock_warning.call_args.args[0] + + +def test_request_include_fields_invalid_root_raises_value_error() -> None: + try: + Log( + title='测试日志', + business_type=BusinessType.OTHER, + request_log_mode='include', + request_include_fields=('body.modelCode',), + ) + except ValueError as exc: + assert 'body.modelCode' in str(exc) + else: + raise AssertionError('expected ValueError for invalid request include root') + + +def test_request_exclude_fields_invalid_root_raises_value_error() -> None: + try: + Log( + title='测试日志', + business_type=BusinessType.OTHER, + request_log_mode='exclude', + request_exclude_fields=('body.apiKey',), + ) + except ValueError as exc: + assert 'body.apiKey' in str(exc) + else: + raise AssertionError('expected ValueError for invalid request exclude root') + + +def test_build_log_text_warns_for_missing_include_field() -> None: + log_decorator = Log(title='测试日志', business_type=BusinessType.OTHER) + + with patch('common.annotation.log_annotation.logger.warning') as mock_warning: + log_text = log_decorator._build_log_text( + LogSanitizer.sanitize_data( + { + 'json_body': { + 'modelCode': 'deepseek-chat', + } + } + ), + mode='include', + include_fields=('json_body.modleCode',), + exclude_fields=(), + payload_kind='request', + ) + + log_payload = json.loads(log_text) + + assert log_payload['selected'] == {} + mock_warning.assert_called_once() + assert '未命中' in mock_warning.call_args.args[0] + assert 'modleCode' in mock_warning.call_args.args[0] + + +def test_missing_include_field_warning_only_emitted_once() -> None: + log_decorator = Log(title='测试日志', business_type=BusinessType.OTHER) + payload = LogSanitizer.sanitize_data({'data': {'userName': 'admin'}}) + + with patch('common.annotation.log_annotation.logger.warning') as mock_warning: + log_decorator._build_log_text( + payload, + mode='include', + include_fields=('data.userNmae',), + exclude_fields=(), + payload_kind='response', + ) + log_decorator._build_log_text( + payload, + mode='include', + include_fields=('data.userNmae',), + exclude_fields=(), + payload_kind='response', + ) + + mock_warning.assert_called_once() + + +def test_include_fields_keep_explicit_none_value() -> None: + log_decorator = Log(title='测试日志', business_type=BusinessType.OTHER) + + with patch('common.annotation.log_annotation.logger.warning') as mock_warning: + log_text = log_decorator._build_log_text( + LogSanitizer.sanitize_data( + { + 'data': { + 'userName': None, + } + } + ), + mode='include', + include_fields=('data.userName',), + exclude_fields=(), + payload_kind='response', + ) + + log_payload = json.loads(log_text) + + assert 'data.userName' in log_payload['selected'] + assert log_payload['selected']['data.userName'] is None + mock_warning.assert_not_called() + + +def test_include_field_warning_reports_ambiguous_normalized_match() -> None: + log_decorator = Log(title='测试日志', business_type=BusinessType.OTHER) + + with patch('common.annotation.log_annotation.logger.warning') as mock_warning: + log_text = log_decorator._build_log_text( + LogSanitizer.sanitize_data( + { + 'data': { + 'user_name': 'admin', + 'userName': 'root', + } + } + ), + mode='include', + include_fields=('data.username',), + exclude_fields=(), + payload_kind='response', + ) + + log_payload = json.loads(log_text) + + assert log_payload['selected'] == {} + mock_warning.assert_called_once() + assert '命名冲突' in mock_warning.call_args.args[0] + assert 'user_name' in mock_warning.call_args.args[0] + assert 'userName' in mock_warning.call_args.args[0] + + +def test_build_log_text_warns_for_missing_exclude_field() -> None: + log_decorator = Log(title='测试日志', business_type=BusinessType.OTHER) + + with patch('common.annotation.log_annotation.logger.warning') as mock_warning: + log_text = log_decorator._build_log_text( + LogSanitizer.sanitize_data( + { + 'json_body': { + 'modelCode': 'deepseek-chat', + } + } + ), + mode='exclude', + include_fields=(), + exclude_fields=('json_body.apiKey',), + payload_kind='request', + ) + + log_payload = json.loads(log_text) + + assert log_payload['json_body']['modelCode'] == 'deepseek-chat' + mock_warning.assert_called_once() + assert '排除路径未命中' in mock_warning.call_args.args[0] + + +def test_build_summary_payload_supports_message_field_fallback() -> None: + log_decorator = Log(title='测试日志', business_type=BusinessType.OTHER) + + summary_payload = log_decorator._build_summary_payload({'code': 200, 'message': '获取成功'}, 'response') + status, error_msg = log_decorator._get_status_and_error_msg({'code': 500, 'message': '获取失败'}) + + assert summary_payload['msg'] == '获取成功' + assert status == 1 + assert error_msg == '获取失败' diff --git a/ruoyi-fastapi-backend/utils/log_util.py b/ruoyi-fastapi-backend/utils/log_util.py index 0817faf..298af7f 100644 --- a/ruoyi-fastapi-backend/utils/log_util.py +++ b/ruoyi-fastapi-backend/utils/log_util.py @@ -1,7 +1,11 @@ +import ast import json import logging import os +import re import sys +import traceback +from collections.abc import Mapping, Sequence from typing import Any from loguru import logger as _logger @@ -12,7 +16,375 @@ from utils.server_util import WorkerIdUtil +def _split_field_tokens(field_name: str) -> tuple[str, ...]: + """ + 拆分字段名为文本匹配用的 token + + :param field_name: 原始字段名 + :return: token 元组 + """ + separated_name = re.sub(r'([a-z0-9])([A-Z])', r'\1 \2', field_name) + normalized_name = re.sub(r'[^A-Za-z0-9]+', ' ', separated_name) + return tuple(token.lower() for token in normalized_name.split() if token) + + +def _build_text_key_pattern(field_name: str) -> str: + """ + 构建文本日志中的字段名匹配正则,自动兼容 snake_case / camelCase / kebab-case / dot.case + + :param field_name: 原始字段名 + :return: 正则片段 + """ + tokens = _split_field_tokens(field_name) + if not tokens: + return re.escape(field_name) + return r'[\s._-]*'.join(re.escape(token) for token in tokens) + + +def _build_text_assignment_patterns(key_pattern: str) -> list[re.Pattern[str]]: + """ + 构建文本日志中的键值对匹配正则 + + :param key_pattern: 字段名正则片段 + :return: 正则列表 + """ + if not key_pattern: + return [] + return [ + re.compile( + rf'(?P(?P[\'"]?)(?P(?:{key_pattern}))(?P=key_quote)\s*[:=]\s*)' + r'(?P[\'"])(?P.*?)(?P=quote)', + re.IGNORECASE, + ), + re.compile( + rf'(?P(?P[\'"]?)(?P(?:{key_pattern}))(?P=key_quote)\s*[:=]\s*)' + r'(?P[\'"]?)(?P[^\'"\s,;]+)(?P=quote)', + re.IGNORECASE, + ), + ] + + +class LogSanitizer: + """ + 日志脱敏工具 + """ + + _MASK = LogConfig.log_mask_placeholder + _PHONE_MIN_LENGTH = 7 + _PHONE_PREFIX_LENGTH = 3 + _PHONE_SUFFIX_LENGTH = 4 + _EMAIL_SHORT_LOCAL_LENGTH = 2 + _IPV4_PARTS = 4 + _IPV4_VISIBLE_PARTS = 3 + _IPV6_MASK_THRESHOLD = 2 + _IPV6_VISIBLE_PARTS = 2 + _SENSITIVE_FIELDS = { + re.sub(r'[^a-z0-9]', '', item.lower()) + for item in (field.strip() for field in LogConfig.log_mask_fields.split(',')) + if item + } + _TEXT_SENSITIVE_FIELDS = tuple(field.strip() for field in LogConfig.log_mask_fields.split(',') if field.strip()) + _TEXT_SENSITIVE_KEY_PATTERN = '|'.join( + sorted({_build_text_key_pattern(field_name) for field_name in _TEXT_SENSITIVE_FIELDS}, key=len, reverse=True) + ) + _PARTIAL_MASK_FIELDS = { + re.sub(r'[^a-z0-9]', '', item.lower()) + for item in (field.strip() for field in LogConfig.log_partial_mask_fields.split(',')) + if item + } + _TEXT_PARTIAL_FIELDS = tuple( + field.strip() for field in LogConfig.log_partial_mask_fields.split(',') if field.strip() + ) + _TEXT_PARTIAL_KEY_PATTERN = '|'.join( + sorted({_build_text_key_pattern(field_name) for field_name in _TEXT_PARTIAL_FIELDS}, key=len, reverse=True) + ) + _CONFIG_SECRET_PATTERNS = [ + re.compile(pattern.strip(), re.IGNORECASE) + for pattern in LogConfig.log_config_secret_patterns.split(',') + if pattern.strip() + ] + _KV_PATTERNS = [ + re.compile( + r'(?Pauthorization\s*[:=]\s*)(?P[\'"]?)(?Pbearer\s+[^\s\'",;]+|[^\s\'",;]+)(?P=quote)', + re.IGNORECASE, + ), + *_build_text_assignment_patterns(_TEXT_SENSITIVE_KEY_PATTERN), + re.compile(r'(?PBearer\s+)(?P[A-Za-z0-9\-._~+/]+=*)', re.IGNORECASE), + ] + _PARTIAL_KV_PATTERNS = _build_text_assignment_patterns(_TEXT_PARTIAL_KEY_PATTERN) + _LOGIN_CODE_PATTERN = re.compile(r'^[A-Za-z0-9]{4,8}$') + + @classmethod + def sanitize_data(cls, data: Any, field_name: str | None = None) -> Any: + """ + 对日志数据进行脱敏 + + :param data: 原始日志数据 + :param field_name: 当前字段名 + :return: 脱敏后的数据 + """ + if not LogConfig.log_mask_enabled: + return data + if data is None: + return None + if hasattr(data, 'model_dump'): + return cls.sanitize_data(data.model_dump(by_alias=True, exclude_none=True), field_name) + if isinstance(data, Mapping): + return cls._sanitize_mapping(data) + if isinstance(data, str): + return cls._sanitize_string(data, field_name) + if isinstance(data, bytes): + return f'' + if isinstance(data, Sequence) and not isinstance(data, (str, bytes, bytearray)): + return [cls.sanitize_data(item, field_name) for item in data] + return data + + @classmethod + def sanitize_text(cls, text: str) -> str: + """ + 对普通文本日志进行脱敏 + + :param text: 原始文本 + :return: 脱敏后的文本 + """ + if not LogConfig.log_mask_enabled: + return text + if not isinstance(text, str): + return text + sanitized_text = cls._sanitize_string(text) + return sanitized_text if isinstance(sanitized_text, str) else json.dumps(sanitized_text, ensure_ascii=False) + + @classmethod + def _sanitize_mapping(cls, data: Mapping[Any, Any]) -> dict[Any, Any]: + """ + 对字典类型数据进行脱敏 + + :param data: 字典数据 + :return: 脱敏后的字典 + """ + sanitized: dict[Any, Any] = {} + normalized_map = {cls._normalize_key(str(key)): value for key, value in data.items()} + + for key, value in data.items(): + key_str = str(key) + normalized_key = cls._normalize_key(key_str) + if cls._should_fully_mask_field(normalized_key, value, normalized_map): + sanitized[key] = cls._MASK + elif normalized_key in cls._PARTIAL_MASK_FIELDS and isinstance(value, str): + sanitized[key] = cls._mask_partial_value(value, normalized_key) + else: + sanitized[key] = cls.sanitize_data(value, key_str) + + config_key = normalized_map.get('configkey') + if isinstance(config_key, str) and cls._is_secret_config_key(config_key): + for key in data: + if cls._normalize_key(str(key)) == 'configvalue': + sanitized[key] = cls._MASK + + return sanitized + + @classmethod + def _sanitize_string(cls, value: str, field_name: str | None = None) -> Any: + """ + 对字符串数据进行脱敏 + + :param value: 字符串值 + :param field_name: 当前字段名 + :return: 脱敏后的字符串或结构化数据 + """ + normalized_field = cls._normalize_key(field_name or '') + if normalized_field in cls._SENSITIVE_FIELDS: + return cls._MASK + if normalized_field in cls._PARTIAL_MASK_FIELDS: + return cls._mask_partial_value(value, normalized_field) + + stripped_value = value.strip() + if stripped_value and stripped_value[0] in '{[': + try: + parsed_value = json.loads(value) + except (TypeError, ValueError, json.JSONDecodeError): + try: + parsed_value = ast.literal_eval(value) + except (SyntaxError, ValueError): + pass + else: + return cls._dump_sanitized_structured_text( + cls.sanitize_data(parsed_value, field_name), + original_text=value, + ) + else: + return cls._dump_sanitized_structured_text( + cls.sanitize_data(parsed_value, field_name), + original_text=value, + ) + + sanitized_text = value + for pattern in cls._KV_PATTERNS: + sanitized_text = pattern.sub(cls._replace_text_secret, sanitized_text) + for pattern in cls._PARTIAL_KV_PATTERNS: + sanitized_text = pattern.sub(cls._replace_text_partial_secret, sanitized_text) + + if '验证码' in sanitized_text: + sanitized_text = re.sub(r'(验证码(?:为|是)?\s*)([A-Za-z0-9]{4,8})', rf'\1{cls._MASK}', sanitized_text) + + return sanitized_text + + @classmethod + def _replace_text_secret(cls, match: re.Match[str]) -> str: + """ + 替换文本中的敏感值 + + :param match: 正则匹配对象 + :return: 脱敏后的文本 + """ + prefix = match.group('prefix') + quote = match.groupdict().get('quote', '') + if quote is None: + quote = '' + return f'{prefix}{quote}{cls._MASK}{quote}' + + @classmethod + def _replace_text_partial_secret(cls, match: re.Match[str]) -> str: + """ + 替换文本中的部分脱敏字段值 + + :param match: 正则匹配对象 + :return: 脱敏后的文本 + """ + prefix = match.group('prefix') + quote = match.groupdict().get('quote', '') + if quote is None: + quote = '' + normalized_key = cls._normalize_key(match.groupdict().get('key', '')) + masked_value = cls._mask_partial_value(match.group('value'), normalized_key) + return f'{prefix}{quote}{masked_value}{quote}' + + @staticmethod + def _dump_sanitized_structured_text(sanitized_value: Any, original_text: str) -> Any: + """ + 将脱敏后的结构化数据恢复为文本,并尽量保持原始换行风格 + + :param sanitized_value: 脱敏后的结构化数据 + :param original_text: 原始文本 + :return: 文本或原始值 + """ + if isinstance(sanitized_value, (dict, list)): + indent = 2 if '\n' in original_text or '\r' in original_text else None + return json.dumps(sanitized_value, ensure_ascii=False, indent=indent) + return sanitized_value + + @classmethod + def _should_fully_mask_field( + cls, normalized_key: str, value: Any, full_mapping: Mapping[str, Any] | None = None + ) -> bool: + """ + 判断字段是否需要全量脱敏 + + :param normalized_key: 标准化后的字段名 + :param value: 字段值 + :param full_mapping: 当前层级的完整字段映射 + :return: 是否需要全量脱敏 + """ + if normalized_key in cls._SENSITIVE_FIELDS: + return True + if normalized_key in {'captchacode', 'smscode'}: + return True + if normalized_key == 'code' and isinstance(value, str): + sibling_keys = set((full_mapping or {}).keys()) + if 'uuid' in sibling_keys and cls._LOGIN_CODE_PATTERN.fullmatch(value): + return True + return False + + @classmethod + def _is_secret_config_key(cls, config_key: str) -> bool: + """ + 判断参数键是否属于敏感配置 + + :param config_key: 参数键 + :return: 是否敏感 + """ + return any(pattern.search(config_key) for pattern in cls._CONFIG_SECRET_PATTERNS) + + @classmethod + def _mask_partial_value(cls, value: str, normalized_field: str) -> str: + """ + 对部分字段进行部分脱敏 + + :param value: 原始值 + :param normalized_field: 标准化后的字段名 + :return: 脱敏后的值 + """ + if normalized_field in {'phonenumber', 'phone', 'mobile'}: + return cls._mask_phone(value) + if normalized_field == 'email': + return cls._mask_email(value) + if normalized_field in {'ip', 'ipaddr', 'operip', 'loginip'}: + return cls._mask_ip(value) + return cls._MASK + + @staticmethod + def _normalize_key(field_name: str) -> str: + """ + 标准化字段名 + + :param field_name: 原始字段名 + :return: 标准化字段名 + """ + return re.sub(r'[^a-z0-9]', '', field_name.lower()) + + @classmethod + def _mask_phone(cls, value: str) -> str: + """ + 手机号脱敏 + + :param value: 原始手机号 + :return: 脱敏后的手机号 + """ + digits = re.sub(r'\D', '', value) + if len(digits) < cls._PHONE_MIN_LENGTH: + return cls._MASK + return f'{digits[: cls._PHONE_PREFIX_LENGTH]}****{digits[-cls._PHONE_SUFFIX_LENGTH :]}' + + @classmethod + def _mask_email(cls, value: str) -> str: + """ + 邮箱脱敏 + + :param value: 原始邮箱 + :return: 脱敏后的邮箱 + """ + if '@' not in value: + return cls._MASK + local_part, domain = value.split('@', 1) + masked_local = ( + f'{local_part[:1]}***' + if len(local_part) <= cls._EMAIL_SHORT_LOCAL_LENGTH + else f'{local_part[:1]}***{local_part[-1:]}' + ) + return f'{masked_local}@{domain}' + + @classmethod + def _mask_ip(cls, value: str) -> str: + """ + IP地址脱敏 + + :param value: 原始IP + :return: 脱敏后的IP + """ + if '.' in value: + parts = value.split('.') + if len(parts) == cls._IPV4_PARTS: + return '.'.join([*parts[: cls._IPV4_VISIBLE_PARTS], '*']) + if ':' in value: + parts = value.split(':') + if len(parts) > cls._IPV6_MASK_THRESHOLD: + return ':'.join([*parts[: cls._IPV6_VISIBLE_PARTS], '*', '*']) + return cls._MASK + + class InterceptHandler(logging.Handler): + target_logger = _logger + def emit(self, record: logging.LogRecord) -> None: """ 拦截标准 logging 记录并转发到 Loguru @@ -21,14 +393,14 @@ def emit(self, record: logging.LogRecord) -> None: :return: None """ try: - level = _logger.level(record.levelname).name + level = self.target_logger.level(record.levelname).name except ValueError: level = record.levelno frame, depth = logging.currentframe(), 2 while frame and frame.f_code.co_filename == logging.__file__: frame = frame.f_back depth += 1 - _logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage()) + self.target_logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage()) class LoggerInitializer: @@ -71,26 +443,67 @@ def _filter(self, record: dict) -> bool: record['extra']['worker_id'] = self.worker_id record['extra']['instance_id'] = self.instance_id record['extra']['service'] = self.service_name - if record['extra'].get('startup_phase'): - return bool(record['extra'].get('startup_log_enabled')) + startup_phase = record['extra'].get('startup_phase') + startup_log_enabled = record['extra'].get('startup_log_enabled') + record['extra'] = LogSanitizer.sanitize_data(record['extra']) + if startup_phase: + return bool(startup_log_enabled) return True - def _stdout_sink(self, message: Any) -> None: + @staticmethod + def _get_exception_value_text(exception: Any) -> str | None: """ - 将 Loguru 日志记录序列化为 JSON 并输出到 stdout + 获取脱敏后的异常值文本 - :param message: Loguru 消息对象 - :return: None + :param exception: Loguru 异常对象 + :return: 脱敏后的异常值文本 + """ + if not exception or not exception.value: + return None + return LogSanitizer.sanitize_text(str(exception.value)) + + def _get_exception_traceback_text(self, exception: Any) -> str | None: + """ + 获取脱敏后的异常堆栈文本 + + :param exception: Loguru 异常对象 + :return: 脱敏后的异常堆栈文本 + """ + if not exception or not exception.traceback: + return None + if isinstance(exception.traceback, str): + traceback_text = exception.traceback + elif exception.type and exception.value: + traceback_text = ''.join(traceback.format_exception(exception.type, exception.value, exception.traceback)) + else: + traceback_text = str(exception.traceback) + return LogSanitizer.sanitize_text(traceback_text.rstrip()) + + def _build_plain_exception_suffix(self, record: dict) -> str: + """ + 构建普通文本日志使用的异常后缀 + + :param record: Loguru 日志记录字典 + :return: 异常后缀文本 + """ + exception_text = self._get_exception_traceback_text(record.get('exception')) + return f'\n{exception_text}' if exception_text else '' + + def _build_json_payload(self, record: dict) -> dict: + """ + 构建统一的 JSON 日志结构 + + :param record: Loguru 日志记录字典 + :return: JSON 日志结构 """ - record = message.record exception = None if record['exception']: exception = { 'type': record['exception'].type.__name__ if record['exception'].type else None, - 'value': str(record['exception'].value) if record['exception'].value else None, - 'traceback': str(record['exception'].traceback) if record['exception'].traceback else None, + 'value': self._get_exception_value_text(record['exception']), + 'traceback': self._get_exception_traceback_text(record['exception']), } - payload = { + return { 'timestamp': record['time'].isoformat(), 'level': record['level'].name, 'message': record['message'], @@ -109,7 +522,44 @@ def _stdout_sink(self, message: Any) -> None: 'exception': exception, 'extra': record['extra'], } - sys.stdout.write(json.dumps(payload, ensure_ascii=False, default=str) + '\n') + + def _json_log_formatter(self, record: dict) -> str: + """ + 将 Loguru 日志记录格式化为 JSON 文本 + + :param record: Loguru 日志记录字典 + :return: JSON 格式文本 + """ + return json.dumps(self._build_json_payload(record), ensure_ascii=False, default=str) + '\n' + + def _plain_log_formatter(self, record: dict) -> str: + """ + 将 Loguru 日志记录格式化为普通文本,并追加脱敏后的异常堆栈 + + :param record: Loguru 日志记录字典 + :return: 普通文本格式模板 + """ + record['extra']['sanitized_exception'] = self._build_plain_exception_suffix(record) + return ( + '{time:YYYY-MM-DD HH:mm:ss.SSS} | ' + '{extra[trace_id]} | ' + '{extra[span_id]} | ' + '{extra[request_id]} | ' + '{extra[worker_id]} | ' + '{level: <8} | ' + '{name}:{function}:{line} - ' + '{message}{extra[sanitized_exception]}' + ) + + def _patch_record(self, record: dict) -> dict: + """ + 在日志落地前统一脱敏 + + :param record: Loguru 日志记录字典 + :return: 脱敏后的日志记录字典 + """ + record['message'] = LogSanitizer.sanitize_text(record['message']) + return record def _info_file_filter(self, record: dict) -> bool: """ @@ -148,58 +598,75 @@ def init_log(self) -> Logger: :return: 已配置的 Loguru Logger 实例 """ - _logger.remove() + configured_logger = _logger.patch(self._patch_record) + InterceptHandler.target_logger = configured_logger + configured_logger.remove() info_log_path = os.path.join(self._log_base_dir, '{time:YYYY}', '{time:MM}', '{time:DD}', 'info.log') error_log_path = os.path.join(self._log_base_dir, '{time:YYYY}', '{time:MM}', '{time:DD}', 'error.log') if LogConfig.loguru_stdout: if LogConfig.loguru_json: - _logger.add( - self._stdout_sink, + configured_logger.add( + sys.stdout, level=LogConfig.loguru_level, enqueue=True, filter=self._filter, + format=self._json_log_formatter, ) else: - format_str = ( - '{time:YYYY-MM-DD HH:mm:ss.SSS} | ' - '{extra[trace_id]} | ' - '{extra[span_id]} | ' - '{extra[request_id]} | ' - '{extra[worker_id]} | ' - '{level: <8} | ' - '{name}:{function}:{line} - ' - '{message}' - ) - _logger.add( + configured_logger.add( sys.stdout, level=LogConfig.loguru_level, enqueue=True, filter=self._filter, - format=format_str, + format=self._plain_log_formatter, ) if self._log_file_enabled: - _logger.add( - info_log_path, - level='INFO', - rotation=LogConfig.loguru_rotation, - retention=LogConfig.loguru_retention, - compression=LogConfig.loguru_compression, - enqueue=True, - filter=self._info_file_filter, - serialize=LogConfig.loguru_json, - ) - _logger.add( - error_log_path, - level='WARNING', - rotation=LogConfig.loguru_rotation, - retention=LogConfig.loguru_retention, - compression=LogConfig.loguru_compression, - enqueue=True, - filter=self._error_file_filter, - serialize=LogConfig.loguru_json, - ) + if LogConfig.loguru_json: + configured_logger.add( + info_log_path, + level='INFO', + rotation=LogConfig.loguru_rotation, + retention=LogConfig.loguru_retention, + compression=LogConfig.loguru_compression, + enqueue=True, + filter=self._info_file_filter, + serialize=False, + format=self._json_log_formatter, + ) + configured_logger.add( + error_log_path, + level='WARNING', + rotation=LogConfig.loguru_rotation, + retention=LogConfig.loguru_retention, + compression=LogConfig.loguru_compression, + enqueue=True, + filter=self._error_file_filter, + serialize=False, + format=self._json_log_formatter, + ) + else: + configured_logger.add( + info_log_path, + level='INFO', + rotation=LogConfig.loguru_rotation, + retention=LogConfig.loguru_retention, + compression=LogConfig.loguru_compression, + enqueue=True, + filter=self._info_file_filter, + format=self._plain_log_formatter, + ) + configured_logger.add( + error_log_path, + level='WARNING', + rotation=LogConfig.loguru_rotation, + retention=LogConfig.loguru_retention, + compression=LogConfig.loguru_compression, + enqueue=True, + filter=self._error_file_filter, + format=self._plain_log_formatter, + ) self._configure_logging() - return _logger + return configured_logger # 初始化日志处理器