diff --git a/gitgalaxy/standards/language_standards.py b/gitgalaxy/standards/language_standards.py index 5688d3f5..d297ffdf 100644 --- a/gitgalaxy/standards/language_standards.py +++ b/gitgalaxy/standards/language_standards.py @@ -1242,7 +1242,7 @@ # ONLY executable logic blocks. EXCLUDES types/classes. # # ===================================================================== - # [LLM CONTEXT: C# "IRON WALL" FUNCTION EXTRACTOR & REDOS SHIELD] + # [ CONTEXT: C# "IRON WALL" FUNCTION EXTRACTOR & REDOS SHIELD] # PURPOSE: Anchors executable logic blocks (methods) in C# up to C# 14. # VULNERABILITY: C# allows massive return types (e.g., nested tuples), # generics, and explicit interface implementations. If spaces are allowed @@ -2007,7 +2007,7 @@ ), "func_start": re.compile( # ===================================================================== - # [LLM CONTEXT: C++ FUNCTION AST EXTRACTOR & REDOS SHIELD] + # [ CONTEXT: C++ FUNCTION AST EXTRACTOR & REDOS SHIELD] # PURPOSE: Anchors executable logic blocks (methods/functions) in C++. # VULNERABILITY: C++ allows multi-line function signatures and complex # return types (e.g., `std::vector \n myFunc()`). In files with @@ -4857,7 +4857,7 @@ ), # 4. func_start (The Satellite Spawner) # ===================================================================== - # [LLM CONTEXT: FORTRAN FUNCTION AST EXTRACTOR & REDOS SHIELD] + # [ CONTEXT: FORTRAN FUNCTION AST EXTRACTOR & REDOS SHIELD] # PURPOSE: Anchors executable logic blocks (Program, Subroutine, Function, Entry) # across 60+ years of Fortran dialects (F77 through F2018). # VULNERABILITY: Fortran allows extreme signature variability: prefix stacking @@ -6535,7 +6535,7 @@ ), # 4. func_start: Satellite Spawner. Anchors logic blocks (Paragraphs and Sections). # ===================================================================== - # [LLM CONTEXT: COBOL FUNCTION/PARAGRAPH AST EXTRACTOR & REDOS SHIELD] + # [ CONTEXT: COBOL FUNCTION/PARAGRAPH AST EXTRACTOR & REDOS SHIELD] # PURPOSE: Anchors executable logic blocks (Paragraphs and Sections) in COBOL. # VULNERABILITY: COBOL spans 60 years of formatting rules (Fixed vs Free format). # Without strict column boundaries, standard verbs or data definitions diff --git a/gitgalaxy/tools/terabyte_log_scanning/README.md b/gitgalaxy/tools/terabyte_log_scanning/README.md index a174678e..22f7ecb1 100644 --- a/gitgalaxy/tools/terabyte_log_scanning/README.md +++ b/gitgalaxy/tools/terabyte_log_scanning/README.md @@ -4,47 +4,70 @@ [![Scale](https://img.shields.io/badge/Tested-10GB%2B_Files-00BFFF.svg)](#) [![Architecture](https://img.shields.io/badge/Architecture-Single__Pass_Stream-8A2BE2.svg)](#) -Welcome to the **GitGalaxy Terabyte Log Scanning Suite**. +During an active incident response or catastrophic data breach, standard tools fail. Basic `grep` lacks time-series context. Modern SIEMs (Splunk, ElasticSearch) require you to ingest and index data firstβ€”taking hours or days for massive database dumps. -During an active incident response or catastrophic data breach, standard tools fail. Basic `grep` is too rigid and lacks time-series context. Modern SIEMs (like Splunk or ElasticSearch) are incredibly powerful, but they require you to ingest and index the data firstβ€”a process that takes hours or days for a 10GB+ database dump. You need answers immediately. +This suite provides a tactical, pipeline-ready solution: **ultra-high-velocity, unindexed binary streaming.** Running at over 2 GB per minute, our custom stream-processing engine reads data continuously without loading massive files into RAM. Perfect for active breach triage or automated CI/CD pipeline sanitization. -This suite provides a tactical, pipeline-ready solution: **ultra-high-velocity, unindexed binary streaming.** Running at over 2 GB per minute on standard hardware, our custom stream-processing engine reads data continuously without ever loading the massive file into RAM. This makes it perfect for active breach triage, or as an automated CI/CD pipeline job to sanitize server logs before they are permanently archived. - -### 1. [The PII Data Leak Hunter](https://squid-protocol.github.io/gitgalaxy/04-06-pii-leak-hunter/) (`pii-leak-hunter`) +--- -A specialized incident response tool designed to find hemorrhaging Personally Identifiable Information (Credit Cards, SSNs, AWS API Keys) inside massive, raw data dumps. +## Part 1: The PII Data Leak Hunter (`pii-leak-hunter`) +[πŸ“– Official Documentation](https://squid-protocol.github.io/gitgalaxy/04-06-pii-leak-hunter/) -* **Binary-Level Regex Evaluation:** Compiles structural patterns to raw bytes for extreme CPU efficiency. -* **Automated Data Masking:** Redacts toxic payloads before writing to safe evidence logs. -* **Exfiltration Histograms:** Generates terminal ASCII charts to pinpoint exact breach minutes. -* **Pipeline Sanitization:** Runs automatically in CI/CD to block PII log archiving via our [Hunting PII Leaks Recipe](https://squid-protocol.github.io/gitgalaxy/cookbook/hunt-pii-leaks/). +A specialized incident response tool. Designed to find hemorrhaging Personally Identifiable Information inside massive, raw data dumps. -### 2. [The Terabyte Log Scanner](https://squid-protocol.github.io/gitgalaxy/04-07-terabyte-log-scanner/) (`terabyte-log-scanner`) +**How it works:** +* **Binary-Level Regex:** Compiles structural patterns to raw bytes. Extreme CPU efficiency. +* **Automated Masking:** Redacts toxic payloads before writing to safe evidence logs. +* **Exfiltration Histograms:** Generates ASCII charts. Pinpoints exact breach minutes. -A runtime execution tracer that connects static codebase architecture to physical runtime reality. It parses massive mainframe SMF logs or distributed traces to prove what code is actually executing. +**Performance Showcase:** Streamed a raw **1.00 GB compromised log file**. Completed in **25.72 seconds**. Detected and actively masked over **420,000 sensitive records**. Immediately exposed two distinct attack vectors (Customer data at 14:00, AWS Keys at 09:00). -* **Intermediate Representation (IR) Ingestion:** Ingests static repository maps to hunt known compiled programs in the logs. -* **Execution Verification:** Proves exact runtime execution frequencies in production environments. -* **Zero-Hit Dead Code:** Mathematically [proves if compiled legacy code is truly abandoned](https://squid-protocol.github.io/gitgalaxy/cookbook/prove-dead-code-logs/). -* **Dynamic Telemetry:** Outputs sidecar JSON for 3D WebGPU traffic heatmaps. +### Targeted Patterns +The stream engine currently bypasses standard indexing to hunt and actively mask: +* **VISA** (Credit Cards) +* **MASTERCARD** (Credit Cards) +* **SSN** (US Social Security Numbers) +* **AWS_KEY** (AKIA, ASIA, AGPA, etc.) ---- +### Quickstart & Integration +**Local CLI Execution:** +By default, the tool saves the masked evidence log in the same directory as the target. +```bash +pii-leak-hunter /path/to/massive_database_dump.sql +``` -### ⚑ Performance & Anomaly Detection Showcases +**Using the `--out` Flag:** +Route the safe, masked telemetry to a secure directory for analysis. +```bash +pii-leak-hunter /path/to/production.log --out /var/secure_logs/ +``` -#### Showcase A: PII Exfiltration & Automated Masking -To demonstrate incident response capabilities, we streamed a raw **1.00 GB compromised log file**. The PII Leak Hunter chewed through the file in **25.72 seconds**, detecting and actively masking over **420,000 sensitive records**. +**GitHub Actions CI/CD Integration:** +Automate sanitization before archiving logs. +```yaml + - name: Run PII Leak Hunter + uses: squid-protocol/gitgalaxy@main + with: + tool: 'pii-leak-hunter' + target: './logs/production_dump.sql' + args: '--out ./sanitized_logs/' +``` -The resulting time-series histograms immediately exposed two distinct attack patterns: Customer data (VISA/SSNs) was actively exfiltrated at `14:00`, while infrastructure secrets (AWS Keys) were being scraped on an entirely separate cron schedule at `09:00`. +--- -![PII Leak Hunter Demo](../../../docs/wiki/assets/pii_leak_hunt.gif) +## Part 2: The Terabyte Log Scanner (`terabyte-log-scanner`) +[πŸ“– Official Documentation](https://squid-protocol.github.io/gitgalaxy/04-07-terabyte-log-scanner/) -#### Showcase B: Runtime Anomaly Detection -We ran the Terabyte Log Scanner against a raw **2.1GB production stream log**, hunting for specific error and failure signatures. The engine completed the single-pass scan in **30.07 seconds**. +A runtime execution tracer. Connects static codebase architecture to physical runtime reality. Parses massive mainframe SMF logs or distributed traces to prove what code actually executes. -The dynamically scaled ASCII time-series histograms instantly exposed a massive, coordinated anomaly: a brute-force attack occurring exactly at `14:00` every day, perfectly isolated from millions of lines of background noise. +**How it works:** +* **Single-Pass Streaming:** Never loads the full file into RAM. +* **Execution Verification:** Proves exact runtime execution frequencies. +* **Zero-Hit Detection:** Mathematically proves if compiled legacy code is abandoned. +* **Dynamic Sidecars:** Outputs telemetry JSON for 3D WebGPU traffic heatmaps. -![Terabyte Log Scanner Demo](../../../docs/wiki/assets/mega_log_scan.gif) +**Performance Showcase:** +Ran against a raw **2.1GB production stream log**. Completed single-pass scan in **30.07 seconds**. Dynamically scaled ASCII histograms instantly exposed a massive brute-force anomaly isolated from background noise: ```text === TIME-SERIES: ERROR === @@ -52,65 +75,35 @@ The dynamically scaled ASCII time-series histograms instantly exposed a massive, [2026-04-16 14:00] β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ (5,759 hits) <-- ANOMALY SPIKE [2026-04-27 14:00] β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ (5,753 hits) <-- ANOMALY SPIKE [2026-05-02 14:00] β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ (5,718 hits) <-- ANOMALY SPIKE - [2026-05-06 14:00] β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ (5,705 hits) <-- ANOMALY SPIKE ``` ---- - -### πŸš€ Quickstart: Local CLI & CI/CD Integration +### Input Methods: Manual vs. Automated +The tool requires one of two input methods to function. It will not run without a target list. -Because these tools operate via single-pass streaming, they require zero environment setup, database indexing, or heavy JVMs. If you have installed GitGalaxy globally via PyPI (`pip install gitgalaxy`), they are ready to run instantly. - -#### 1. Local CLI Execution - -**Hunt for PII Leaks in a raw database dump:** +**1. Manual Mode (`-k` or `--keywords`)** +Best for quick, grep-style tactical hunts. Supply a space-separated list of targets. ```bash -pii-leak-hunter /path/to/massive_database_dump.sql +terabyte-log-scanner /path/to/production.log -k ERROR TIMEOUT "DATA EXCEPTION" ``` -**Stream logs to prove runtime execution of static code:** +**2. Automated Pipeline Mode (`--input_state`)** +Best for CI/CD modernization pipelines. Supply a GitGalaxy Intermediate Representation (IR) JSON file. The script will automatically extract the targets from the `known_programs` array to hunt for dead code. ```bash terabyte-log-scanner /path/to/production.log --input_state ../core/ir_state.json ``` -#### 2. GitHub Actions CI/CD Integration - -You can automate the sanitization of logs or artifacts before they are uploaded or archived. Create a file in your repository at `.github/workflows/pii-audit.yml`: - -```yaml -name: GitGalaxy Log Sanitization - -on: - workflow_dispatch: # Can be run manually or on a cron schedule - -jobs: - gitgalaxy-log-scan: - runs-on: ubuntu-latest - steps: - - name: Checkout Repository - uses: actions/checkout@v4 - - # (Assuming a previous step generated or downloaded the target log file) - - - name: Run PII Leak Hunter - uses: squid-protocol/gitgalaxy@main - with: - tool: 'pii-leak-hunter' - target: './logs/production_dump.sql' - args: '--out ./sanitized_logs/' - - - name: Archive Safe Evidence Logs - uses: actions/upload-artifact@v4 - with: - name: sanitized-evidence-logs - path: ./sanitized_logs/*_pii_leak_evidence.log +*Required JSON Schema for Automated Mode:* +```json +{ + "analysis": { + "known_programs": ["PROGRAM1", "PROGRAM2"] + } +} ``` --- ### 🌌 Powered by the blAST Engine (Bypassing LLMs and ASTs) -This tool is a modular enterprise integration within the broader GitGalaxy architecture. It is driven by our custom mathematical heuristics engine, capable of processing multi-dimensional data at extreme velocity without requiring rigid ASTs or cloud APIs. Read the official documentation to see the structural methodologies powering this high-speed log analysis: +This suite is driven by our custom deterministic heuristics engine. It processes multi-dimensional data at extreme velocity without requiring rigid ASTs or hallucinating LLMs. * πŸ“– **[The blAST Paradigm (ASTs vs LLMs)](https://squid-protocol.github.io/gitgalaxy/01-03-the-blast-paradigm/)** -* πŸ“– **[PII Leak Hunter Architecture](https://squid-protocol.github.io/gitgalaxy/04-06-pii-leak-hunter/)** -* πŸ“– **[Terabyte Log Scanner Mechanics](https://squid-protocol.github.io/gitgalaxy/04-07-terabyte-log-scanner/)** * πŸͺ **[Return to the Main GitGalaxy Hub](https://github.com/squid-protocol/gitgalaxy)** \ No newline at end of file diff --git a/gitgalaxy/tools/terabyte_log_scanning/pii_leak_hunter.py b/gitgalaxy/tools/terabyte_log_scanning/pii_leak_hunter.py index cf1c729c..0b871d6f 100644 --- a/gitgalaxy/tools/terabyte_log_scanning/pii_leak_hunter.py +++ b/gitgalaxy/tools/terabyte_log_scanning/pii_leak_hunter.py @@ -63,25 +63,60 @@ def draw_ascii_histogram(time_buckets: dict, keyword: str): print(f" [{time_bucket}] {bar} ({hits:,} hits){alert}") def main(): - parser = argparse.ArgumentParser(description="GitGalaxy PII Data Leak Hunter") + # ------------------------------------------------------------------------- + # 1. CLI ARGUMENT PARSING & DOCUMENTATION + # ------------------------------------------------------------------------- + parser = argparse.ArgumentParser( + description="GitGalaxy PII Data Leak Hunter: High-speed streaming parser to detect and mask exposed sensitive data.", + formatter_class=argparse.RawTextHelpFormatter, + epilog=""" +============================================================================== +HUNTING CAPABILITIES: +This engine bypasses standard indexing to stream raw binary logs or database +dumps. It currently hunts and actively masks the following patterns: + - VISA Credit Cards + - MASTERCARD Credit Cards + - US Social Security Numbers (SSN) + - AWS API Keys (AKIA, ASIA, etc.) + +Masked evidence logs are safely written to disk without exposing the full PII. +============================================================================== + """ + ) parser.add_argument("target", help="Path to the log file or database dump to scan") parser.add_argument("--out", type=str, help="Optional: Custom directory to save the safe evidence log") args = parser.parse_args() + # ------------------------------------------------------------------------- + # 2. FILE VALIDATION & GUARDRAILS + # ------------------------------------------------------------------------- target_path = Path(args.target).resolve() - if not target_path.exists(): - print(f"Error: Target {target_path} does not exist.") + if not target_path.exists() or not target_path.is_file(): + print(f"\n[!] ERROR: Target file does not exist or is not a file: {target_path}") sys.exit(1) if args.out: out_dir = Path(args.out).resolve() - out_dir.mkdir(parents=True, exist_ok=True) - results_path = out_dir / f"{target_path.stem}_pii_leak_evidence.log" else: - results_path = target_path.parent / f"{target_path.stem}_pii_leak_evidence.log" + out_dir = target_path.parent + + try: + out_dir.mkdir(parents=True, exist_ok=True) + except PermissionError: + print(f"\n[!] ERROR: Permission denied to create output directory: {out_dir}") + sys.exit(1) + + results_path = out_dir / f"{target_path.stem}_pii_leak_evidence.log" - file_size_gb = target_path.stat().st_size / (1024**3) - print(f"🚨 Tapping into data stream: {target_path.name} ({file_size_gb:.2f} GB)") + try: + file_size_bytes = target_path.stat().st_size + file_size_gb = file_size_bytes / (1024**3) + file_size_mb = file_size_bytes / (1024**2) + except OSError as e: + print(f"\n[!] ERROR: Could not read target file size: {e}") + sys.exit(1) + + print(f"🚨 Tapping into data stream: {target_path.name} ({file_size_gb:.2f} GB / {file_size_mb:.2f} MB)") print(f"πŸ›‘οΈ Masking enabled. Streaming safe evidence to: {results_path.name}") ts_pattern = re.compile(br'(\d{4}-\d{2}-\d{2}[T\s]\d{2}|\b[A-Z][a-z]{2}\s+\d{1,2}\s\d{2})') @@ -89,30 +124,37 @@ def main(): start_time = time.time() - # 2. The Memory Shield: Read binary in, check regex, stream masked text out - with open(target_path, 'rb') as f_in, open(results_path, 'w', encoding='utf-8') as f_out: - for line in f_in: - hit_found = False - for pii_type, pattern in PII_PATTERNS.items(): - if pattern.search(line): - # Only decode the line if a physical hit is detected to save CPU cycles - if not hit_found: - decoded_line = line.decode('utf-8', errors='ignore').strip() - safe_line = mask_pii(decoded_line) - f_out.write(f"[{pii_type}] {safe_line}\n") - hit_found = True # Prevent duplicate writes if a line has multiple PII types - - ts_match = ts_pattern.search(line) - bucket = ts_match.group(1).decode('utf-8', errors='ignore') + ":00" if ts_match else "Unknown Time" - histograms[pii_type][bucket] += 1 + # ------------------------------------------------------------------------- + # 3. HIGH-SPEED SCANNING (The Memory Shield) + # ------------------------------------------------------------------------- + try: + with open(target_path, 'rb') as f_in, open(results_path, 'w', encoding='utf-8') as f_out: + for line in f_in: + hit_found = False + for pii_type, pattern in PII_PATTERNS.items(): + if pattern.search(line): + # Only decode the line if a physical hit is detected to save CPU cycles + if not hit_found: + decoded_line = line.decode('utf-8', errors='ignore').strip() + safe_line = mask_pii(decoded_line) + f_out.write(f"[{pii_type}] {safe_line}\n") + hit_found = True # Prevent duplicate writes if a line has multiple PII types + + ts_match = ts_pattern.search(line) + bucket = ts_match.group(1).decode('utf-8', errors='ignore') + ":00" if ts_match else "Unknown Time" + histograms[pii_type][bucket] += 1 + except IOError as e: + print(f"\n[!] FATAL I/O ERROR during streaming: {e}") + sys.exit(1) time_elapsed = time.time() - start_time - # 3. Print the Visual Dashboards + # ------------------------------------------------------------------------- + # 4. REPORTING & DASHBOARDS + # ------------------------------------------------------------------------- for kw in PII_PATTERNS.keys(): draw_ascii_histogram(histograms[kw], kw) - # 4. Calculate totals for the Executive Summary total_counts = {kw: sum(buckets.values()) for kw, buckets in histograms.items()} max_total = max(total_counts.values()) if total_counts.values() else 0 @@ -129,9 +171,20 @@ def main(): print(" βœ… Clean scan. No Social Security, Credit Card, or AWS Keys detected.") print("-" * 75) - processing_speed = file_size_gb / time_elapsed if time_elapsed > 0 else 0 + + # Safely calculate processing speed depending on file size to prevent math errors + if time_elapsed > 0: + if file_size_gb > 0.1: + speed = file_size_gb / time_elapsed + speed_str = f"{speed:.3f} GB/s" + else: + speed = file_size_mb / time_elapsed + speed_str = f"{speed:.2f} MB/s" + else: + speed_str = "Instant" + print(f" βœ… Scan complete. Sliced through {target_path.name} in {time_elapsed:.2f} seconds.") - print(f" ⚑ Processing Velocity: {processing_speed:.3f} GB/s") + print(f" ⚑ Processing Velocity: {speed_str}") print(f" πŸ“ Safe Evidence Log: {results_path.resolve()}") print("="*75 + "\n") diff --git a/gitgalaxy/tools/terabyte_log_scanning/terabyte_log_scanner.py b/gitgalaxy/tools/terabyte_log_scanning/terabyte_log_scanner.py index 90633c2b..dfe3cdc6 100644 --- a/gitgalaxy/tools/terabyte_log_scanning/terabyte_log_scanner.py +++ b/gitgalaxy/tools/terabyte_log_scanning/terabyte_log_scanner.py @@ -5,15 +5,18 @@ # ============================================================================== import argparse import sys -import os import re import time -import json # <-- ADD THIS +import json from collections import defaultdict from pathlib import Path def draw_ascii_histogram(time_buckets: dict, keyword: str): - """Draws a dynamically scaled ASCII histogram, showing only top spikes if massive.""" + """ + Draws a dynamically scaled ASCII histogram. + If the dataset is massive, it filters to show only the highest volume spikes + to prevent terminal flooding. + """ if not time_buckets: return @@ -24,7 +27,7 @@ def draw_ascii_histogram(time_buckets: dict, keyword: str): avg_hits = sum(time_buckets.values()) / len(time_buckets) anomaly_threshold = avg_hits * 3 - # THE UX FIX: If there are too many buckets, only show the Top 15 worst ones + # UX Safeguard: If there are too many buckets, only show the Top 15 worst ones if len(time_buckets) > 15: print(" (Filtering to Top 15 Highest Volume Spikes)") # Sort by highest hits, grab top 15, then resort chronologically for the graph @@ -34,105 +37,172 @@ def draw_ascii_histogram(time_buckets: dict, keyword: str): display_buckets = dict(sorted(time_buckets.items())) for time_bucket, hits in display_buckets.items(): + # Calculate bar length safely bar_len = int((hits / max_hits) * max_bar_width) if max_hits > 0 else 0 bar = "β–ˆ" * max(1, bar_len) + # Flag statistical anomalies visually alert = " <-- ANOMALY SPIKE" if hits >= anomaly_threshold and hits > 10 else "" print(f" [{time_bucket}] {bar} ({hits:,} hits){alert}") def main(): - parser = argparse.ArgumentParser(description="GitGalaxy Mega Log Parser") + # ------------------------------------------------------------------------- + # 1. CLI ARGUMENT PARSING & DOCUMENTATION + # ------------------------------------------------------------------------- + parser = argparse.ArgumentParser( + description="GitGalaxy Mega Log Parser: High-speed, single-pass log analyzer with ASCII time-series histograms.", + formatter_class=argparse.RawTextHelpFormatter, + epilog=""" +============================================================================== +JSON IR State Structure: +If using --input_state, the script expects a GitGalaxy Intermediate +Representation (IR) JSON file. It specifically targets the 'known_programs' +array to hunt for dead code and execution volumes. + +Expected JSON Schema: +{ + "analysis": { + "known_programs": ["PROGRAM1", "PROGRAM2"] + } +} +============================================================================== + """ + ) parser.add_argument("target", help="Path to the log file (Translated ASCII SMF)") - # Make keywords optional, and add an input file argument - parser.add_argument("-k", "--keywords", nargs="+", help="Keywords to search for") + parser.add_argument("-k", "--keywords", nargs="+", help="Keywords to search for manually (e.g., -k PGM1 PGM2)") parser.add_argument("--input_state", type=str, help="Path to GitGalaxy ir_state.json to auto-extract targets") parser.add_argument("--out", type=str, help="Optional: Custom directory to save the results log") args = parser.parse_args() + # Validate target log file exists before doing any work target_path = Path(args.target).resolve() - if not target_path.exists(): - print(f"Error: Target {target_path} does not exist.") + if not target_path.exists() or not target_path.is_file(): + print(f"\n[!] ERROR: Target log file does not exist or is not a file: {target_path}") sys.exit(1) - # ===================================================================== - # ADAPTATION 1: The Input Handshake - # ===================================================================== + # ------------------------------------------------------------------------- + # 2. INPUT HANDSHAKE & VALIDATION (No Silent Failures) + # ------------------------------------------------------------------------- search_targets = [] - dynamic_call_hunts = {} if args.input_state: - state_path = Path(args.input_state) - if state_path.exists(): - with open(state_path, 'r') as f: + state_path = Path(args.input_state).resolve() + if not state_path.exists(): + print(f"\n[!] ERROR: Input state JSON file not found: {state_path}") + sys.exit(1) + + try: + with open(state_path, 'r', encoding='utf-8') as f: ir_state = json.load(f) - # Extract all known programs to check for the 0-Hit dead code rule - search_targets = ir_state.get('analysis', {}).get('known_programs', []) - print(f"πŸ“‘ Loaded {len(search_targets)} targets from {state_path.name}") + + # Strict Schema Validation + if not isinstance(ir_state, dict): + raise ValueError("The root of the JSON file must be an object {}.") + if 'analysis' not in ir_state or 'known_programs' not in ir_state['analysis']: + raise ValueError("JSON is missing the required ['analysis']['known_programs'] path.") + + search_targets = ir_state['analysis']['known_programs'] + + if not isinstance(search_targets, list) or not search_targets: + print(f"\n[!] WARNING: 'known_programs' array is empty or invalid. Nothing to search.") + sys.exit(0) + + print(f"πŸ“‘ Loaded {len(search_targets)} targets from {state_path.name}") + + except json.JSONDecodeError as e: + print(f"\n[!] ERROR: Invalid JSON format in {state_path.name}:\n {e}") + sys.exit(1) + except Exception as e: + print(f"\n[!] ERROR: Failed to parse input state:\n {e}") + sys.exit(1) + elif args.keywords: search_targets = args.keywords else: - print("Error: Must provide either -k keywords or --input_state json.") + print("\n[!] ERROR: You must provide targets using either -k/--keywords or --input_state.") + parser.print_help() sys.exit(1) - # ===================================================================== - # ADAPTATION 2: Mainframe Execution Regex - # We prefix targets with common SMF/JCL execution markers to avoid noise - # ===================================================================== + # ------------------------------------------------------------------------- + # 3. REGEX COMPILATION & FILE PREPARATION + # ------------------------------------------------------------------------- keyword_patterns = {} for kw in search_targets: - # Example: looking for "PGM=NAME" or "STARTED NAME" - pattern_str = fr"{kw}" - keyword_patterns[kw] = re.compile(pattern_str.encode('utf-8'), re.IGNORECASE) + # Pre-compile regex for speed. Encode to bytes for fast binary reading. + try: + pattern_str = fr"{kw}" + keyword_patterns[kw] = re.compile(pattern_str.encode('utf-8'), re.IGNORECASE) + except re.error as e: + print(f"\n[!] ERROR: Invalid regex generated for keyword '{kw}': {e}") + sys.exit(1) ts_pattern = re.compile(br'(\d{4}-\d{2}-\d{2}[T\s]\d{2}|\b[A-Z][a-z]{2}\s+\d{1,2}\s\d{2})') histograms = {kw: defaultdict(int) for kw in search_targets} + # Determine output paths if args.out: out_dir = Path(args.out).resolve() - out_dir.mkdir(parents=True, exist_ok=True) - results_path = out_dir / f"{target_path.stem}_results.txt" else: - results_path = target_path.parent / f"{target_path.stem}_results.txt" + out_dir = target_path.parent + + try: + out_dir.mkdir(parents=True, exist_ok=True) + except PermissionError: + print(f"\n[!] ERROR: Permission denied to create output directory: {out_dir}") + sys.exit(1) + + results_path = out_dir / f"{target_path.stem}_results.txt" + sidecar_path = out_dir / "dynamic_telemetry.json" start_time = time.time() print(f"πŸš€ Scanning {target_path.name} for {len(search_targets)} keywords...") - # 2. The Memory Shield - with open(target_path, 'rb') as f_in, open(results_path, 'w', encoding='utf-8') as f_out: - for line in f_in: - for kw, pattern in keyword_patterns.items(): - if pattern.search(line): - decoded_line = line.decode('utf-8', errors='ignore').strip() - ts_match = ts_pattern.search(line) - bucket = ts_match.group(1).decode('utf-8', errors='ignore') + ":00" if ts_match else "Unknown Time" - histograms[kw][bucket] += 1 - f_out.write(f"{decoded_line}\n") - break + # ------------------------------------------------------------------------- + # 4. HIGH-SPEED SCANNING (The Memory Shield) + # ------------------------------------------------------------------------- + try: + with open(target_path, 'rb') as f_in, open(results_path, 'w', encoding='utf-8') as f_out: + for line in f_in: + for kw, pattern in keyword_patterns.items(): + if pattern.search(line): + decoded_line = line.decode('utf-8', errors='ignore').strip() + ts_match = ts_pattern.search(line) + + # Bucket by hour + bucket = ts_match.group(1).decode('utf-8', errors='ignore') + ":00" if ts_match else "Unknown Time" + histograms[kw][bucket] += 1 + + f_out.write(f"{decoded_line}\n") + break # Stop checking keywords once a hit is found on this line + except IOError as e: + print(f"\n[!] FATAL I/O ERROR during scanning: {e}") + sys.exit(1) time_elapsed = time.time() - start_time + # ------------------------------------------------------------------------- + # 5. REPORTING & SIDECAR GENERATION + # ------------------------------------------------------------------------- for kw, buckets in histograms.items(): draw_ascii_histogram(buckets, kw) print(f"\nβœ… Scan completed in {time_elapsed:.2f} seconds.") print(f"πŸ“„ Filtered results saved to: {results_path}") + # Calculate total hits for the JSON sidecar total_counts = {kw: sum(buckets.values()) for kw, buckets in histograms.items()} - - # ===================================================================== - # ADAPTATION 3: The Output Handshake (JSON Sidecar) - # ===================================================================== - sidecar_path = target_path.parent / "dynamic_telemetry.json" - telemetry_payload = { "execution_counts": total_counts, - "resolved_dynamic_calls": {} # Placeholder for advanced chronological resolution + "resolved_dynamic_calls": {} } - with open(sidecar_path, 'w') as f_json: - json.dump(telemetry_payload, f_json, indent=4) - - print(f" πŸ’Ύ JSON State Sidecar written to: {sidecar_path.resolve()}") + try: + with open(sidecar_path, 'w', encoding='utf-8') as f_json: + json.dump(telemetry_payload, f_json, indent=4) + print(f"πŸ’Ύ JSON State Sidecar written to: {sidecar_path}") + except IOError as e: + print(f"\n[!] ERROR: Failed to write telemetry sidecar: {e}") + print("="*75 + "\n") if __name__ == "__main__":