From 1080261256fc0d5ba0eb2af83dc67a4b386eb06e Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Wed, 20 May 2026 13:33:29 +0000 Subject: [PATCH] Add log discovery analyzer task for extensionless timestamp density scanning --- .../openrelik-worker-analyzer-logs/Dockerfile | 4 + .../src/log_discovery.py | 176 ++++++++++++++++++ .../src/tasks.py | 58 ++++++ 3 files changed, 238 insertions(+) create mode 100644 workers/openrelik-worker-analyzer-logs/src/log_discovery.py diff --git a/workers/openrelik-worker-analyzer-logs/Dockerfile b/workers/openrelik-worker-analyzer-logs/Dockerfile index 00ff466..2d9246c 100644 --- a/workers/openrelik-worker-analyzer-logs/Dockerfile +++ b/workers/openrelik-worker-analyzer-logs/Dockerfile @@ -9,6 +9,10 @@ RUN echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selectio RUN apt-get update && apt-get install -y --no-install-recommends \ curl \ python3-poetry \ + sudo \ + fdisk \ + qemu-utils \ + ntfs-3g \ # Add your dependencies here && rm -rf /var/lib/apt/lists/* diff --git a/workers/openrelik-worker-analyzer-logs/src/log_discovery.py b/workers/openrelik-worker-analyzer-logs/src/log_discovery.py new file mode 100644 index 0000000..7baa55c --- /dev/null +++ b/workers/openrelik-worker-analyzer-logs/src/log_discovery.py @@ -0,0 +1,176 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import re +import logging +from openrelik_worker_common.file_utils import create_output_file, is_disk_image +from openrelik_worker_common.mount_utils import BlockDevice + +logger = logging.getLogger(__name__) + +# Common extensions to manually catch raw images if OpenRelik's internal tag is missing +DISK_IMAGE_EXTENSIONS = ('.dd', '.raw', '.e01', '.aff4', '.qcow2', '.vmdk', '.vdi', '.ova', '.iso') + +# Matches standard ISO 8601 dates, Syslog formats, and common log level prefixes. +LOG_PATTERN_REGEX = re.compile( + r'\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}|' + r'(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}|' + r'\[(?:DEBUG|INFO|WARNING|ERROR|CRITICAL|FATAL|NOTICE)\]|' + r'^(?:DEBUG|INFO|WARNING|ERROR|CRITICAL|FATAL|NOTICE):', + re.IGNORECASE +) + +def is_binary(filepath: str) -> bool: + """ + Quickly checks for the presence of null bytes in the file header. + This prevents the worker from wasting CPU cycles running regex over + compiled executables, media files, or proprietary binary databases. + """ + try: + with open(filepath, 'rb') as f: + return b'\0' in f.read(1024) + except IOError: + return True + +class LogDiscoveryAnalyzer: + """ + Analyzes raw file contents to discover unparsed plaintext logs based on timestamp density. + This enables log recovery when file extensions are missing or misleading. + """ + def __init__(self, threshold: float, mount_disk_images: bool): + self.threshold = threshold + self.mount_disk_images = mount_disk_images + + def check_file(self, file_path: str, formatted_path: str, path_list: list): + """Scans a single file's contents and appends it to the path list if it meets the density threshold.""" + + # Explicitly skip nested disk images to prevent recursive "Russian Doll" mounting/analysis + if file_path.lower().endswith(DISK_IMAGE_EXTENSIONS): + return + + # Skip compiled binaries or media files + if is_binary(file_path): + return + + try: + match_count = 0 + line_count = 0 + + # Process line-by-line to prevent memory exhaustion on massive files + with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: + for line in f: + line_count += 1 + if LOG_PATTERN_REGEX.search(line): + match_count += 1 + + # Performance Cap: A genuine log file will demonstrate its density + # well within the first 500 lines. + if line_count >= 500: + break + + # Flag the file if the ratio of log-like lines meets the user's defined threshold + if line_count > 0 and (match_count / line_count) >= self.threshold: + path_list.append(formatted_path) + + except Exception as e: + logger.warning(f"Log discovery failed to read {file_path}: {e}") + + def analyze(self, input_files: list, output_path: str) -> tuple: + """Main orchestration loop: Mounts disks, walks directories, and generates the discovery report.""" + discovered_data = [] + disks_mounted = [] + output_files = [] + total_logs = 0 + + try: + for input_file in input_files: + path = input_file.get('path') + base_name = input_file.get('display_name', path) + evidence_id = input_file.get('uuid', input_file.get('id', 'UNKNOWN_UUID')) + + current_evidence = { + 'id': evidence_id, + 'name': base_name, + 'paths': [] + } + + # Evaluate mounting requirement based on OpenRelik metadata OR standard file extensions + is_disk = is_disk_image(input_file) or path.lower().endswith(DISK_IMAGE_EXTENSIONS) + + # Scenario A: Native disk image mounting + if self.mount_disk_images and is_disk: + logger.info(f"Mounting disk image for log discovery: {base_name}") + bd = BlockDevice(path, min_partition_size=1) + bd.setup() + mountpoints = bd.mount() + disks_mounted.append(bd) + + if not mountpoints: + logger.warning(f"Failed to mount viable partitions in {base_name}") + continue + + for mountpoint in mountpoints: + for root, _, files in os.walk(mountpoint): + for filename in files: + full_path = os.path.join(root, filename) + formatted_path = "/" + os.path.relpath(full_path, mountpoint) + self.check_file(full_path, formatted_path, current_evidence['paths']) + + # Scenario B: Extracted directory structures + elif os.path.isdir(path): + for root, _, files in os.walk(path): + for filename in files: + full_path = os.path.join(root, filename) + formatted_path = "/" + os.path.relpath(full_path, path) + self.check_file(full_path, formatted_path, current_evidence['paths']) + + # Scenario C: Direct single-file evaluation + elif os.path.isfile(path): + self.check_file(path, "/" + os.path.basename(path), current_evidence['paths']) + + if current_evidence['paths']: + discovered_data.append(current_evidence) + + finally: + # Critical cleanup: Ensure all active BlockDevices are unmounted even if the worker crashes + for blockdevice in disks_mounted: + if blockdevice: + logger.info(f"Unmounting image: {blockdevice.image_path}") + blockdevice.umount() + + # Output Generation + if discovered_data: + report_file = create_output_file( + output_path, + display_name="potential_logs_report.txt", + data_type="openrelik:report" + ) + with open(report_file.path, 'w') as f: + for data in discovered_data: + f.write(f"Evidence ID: {data['id']}\n") + f.write(f"Evidence Name: {data['name']}\n\n") + f.write("Potential Log Files Discovered:\n") + f.write("-" * 50 + "\n") + + for p in sorted(data['paths']): + clean_path = p.replace("//", "/") + f.write(f"{clean_path}\n") + + f.write("\n" + "="*50 + "\n\n") + total_logs += len(data['paths']) + + output_files.append(report_file.to_dict()) + + return output_files, total_logs diff --git a/workers/openrelik-worker-analyzer-logs/src/tasks.py b/workers/openrelik-worker-analyzer-logs/src/tasks.py index 10f60ee..b7e0cdd 100644 --- a/workers/openrelik-worker-analyzer-logs/src/tasks.py +++ b/workers/openrelik-worker-analyzer-logs/src/tasks.py @@ -22,6 +22,7 @@ from .app import celery from .logger import log_root from .ssh_analyzer import LinuxSSHAnalysisTask +from .log_discovery import LogDiscoveryAnalyzer # Task name used to register and route the task to the correct queue. TASK_NAME = "openrelik-worker-analyzer-logs.tasks.ssh_analyzer" @@ -131,3 +132,60 @@ def run_ssh_analyzer( task_report=task_report.to_dict(), meta={}, ) + + + +# --- TASK: LOG DISCOVERY --- +TASK_NAME_DISCOVERY = "openrelik-worker-analyzer-logs.tasks.log_discovery" +TASK_METADATA_DISCOVERY = { + "display_name": "Log Discovery (Timestamp Density)", + "description": "Scans file contents to identify unparsed logs based on timestamp density, ignoring file extensions. KNOWN LIMITATIONS: (1) Skips binary files like Windows Event Logs (.evtx), media, and executables. (2) Skips pure JSON logs unless values match standard date formats. (3) Caps scanning at 500 lines to preserve memory, so heavily padded files may be bypassed.", + "task_config": [ + { + "name": "threshold", + "label": "Density Threshold", + "description": "The minimum ratio of log-like lines required to flag a file (e.g., 0.15 means 15% of the sampled lines must match).", + "type": "text", + "required": False, + "default": "0.15" + }, + { + "name": "mount_disk_images", + "label": "Mount disk images", + "description": "If checked, automatically mounts .dd/.raw images and scans their internal file systems.", + "type": "checkbox", + "required": True, + "default_value": True, + }, + ], +} + +@celery.task(bind=True, name=TASK_NAME_DISCOVERY, metadata=TASK_METADATA_DISCOVERY) +def run_log_discovery( + self, + pipe_result: str = None, + input_files: list = None, + output_path: str = None, + workflow_id: str = None, + task_config: dict = None, +) -> str: + """ + Routes the log discovery request to the LogDiscoveryAnalyzer engine. + """ + log_root.bind(workflow_id=workflow_id) + logger.info(f"Starting {TASK_NAME_DISCOVERY} for workflow {workflow_id}") + + input_files = get_input_files(pipe_result, input_files or []) + + threshold = float(task_config.get("threshold") or 0.15) + mount_disk_images = task_config.get("mount_disk_images", True) + + # Initialize the engine from our separate logic file + analyzer = LogDiscoveryAnalyzer(threshold=threshold, mount_disk_images=mount_disk_images) + output_files, total_logs = analyzer.analyze(input_files=input_files, output_path=output_path) + + return create_task_result( + output_files=output_files, + workflow_id=workflow_id, + meta={"summary": f"Found {total_logs} potential logs."} + )