Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions diagnostic/build-2b54872c.json

Large diffs are not rendered by default.

86 changes: 86 additions & 0 deletions diagnostic/build-d52dc82e.json

Large diffs are not rendered by default.

79 changes: 79 additions & 0 deletions tests/test_log_aggregator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import json
import os
import tempfile
import unittest
import sys
from pathlib import Path

# Add the tools directory to the path so we can import log_aggregator
sys.path.insert(0, str(Path(__file__).parent.parent / "tools"))

from log_aggregator import LogAggregator

class TestLogAggregator(unittest.TestCase):
def setUp(self):
self.aggregator = LogAggregator()
self.temp_dir = tempfile.TemporaryDirectory()

def tearDown(self):
self.temp_dir.cleanup()

def test_valid_json_log(self):
log_content = '{"timestamp": 1704110400, "level": "info", "message": "Test"}\n'
log_path = os.path.join(self.temp_dir.name, "valid.log")
with open(log_path, 'w') as f:
f.write(log_content)

count = self.aggregator.process_file(log_path)
self.assertEqual(count, 1)
self.assertEqual(len(self.aggregator.parse_failures), 0)

def test_malformed_json_log(self):
# A malformed JSON line that will trigger JSONDecodeError
log_content = '{"timestamp": 1704110400, "level": "info", "message": "Test"\n'
log_path = os.path.join(self.temp_dir.name, "invalid.log")
with open(log_path, 'w') as f:
f.write(log_content)

count = self.aggregator.process_file(log_path)
self.assertEqual(count, 0)
self.assertEqual(len(self.aggregator.parse_failures), 1)

failure = self.aggregator.parse_failures[0]
self.assertEqual(failure["file"], log_path)
self.assertEqual(failure["line"], 1)
self.assertEqual(failure["parser"], "JSONLogParser")
self.assertIn("JSON Parse Error", failure["error"])

def test_empty_lines_skipped(self):
log_content = '\n\n'
log_path = os.path.join(self.temp_dir.name, "empty.log")
with open(log_path, 'w') as f:
f.write(log_content)

count = self.aggregator.process_file(log_path)
self.assertEqual(count, 0)
self.assertEqual(len(self.aggregator.parse_failures), 0)

def test_export_parse_error_report(self):
log_content = '{invalid}\n'
log_path = os.path.join(self.temp_dir.name, "invalid.log")
with open(log_path, 'w') as f:
f.write(log_content)

self.aggregator.process_file(log_path)

report_path = os.path.join(self.temp_dir.name, "report.json")
self.aggregator.export_parse_error_report(report_path)

self.assertTrue(os.path.exists(report_path))
with open(report_path, 'r') as f:
report = json.load(f)

self.assertEqual(report["total_failures"], 1)
self.assertEqual(len(report["failures"]), 1)
self.assertEqual(report["failures"][0]["parser"], "JSONLogParser")
self.assertEqual(report["failures"][0]["line"], 1)

if __name__ == '__main__':
unittest.main()
88 changes: 61 additions & 27 deletions tools/log_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,11 @@ class JSONLogParser(LogParser):
"""Parses structured JSON log lines."""

def parse(self, line: str) -> Optional[Dict[str, Any]]:
line = line.strip()
if not line:
return None
try:
entry = json.loads(line.strip())
entry = json.loads(line)
if not isinstance(entry, dict):
return None
return {
Expand All @@ -129,8 +132,8 @@ def parse(self, line: str) -> Optional[Dict[str, Any]]:
'fields': entry,
'format': 'json',
}
except json.JSONDecodeError:
return None
except json.JSONDecodeError as e:
raise ValueError(f"JSON Parse Error: {str(e)}")


class TextLogParser(LogParser):
Expand Down Expand Up @@ -212,20 +215,44 @@ def __init__(self):
self.error_patterns: Counter = Counter()
self.top_errors: Counter = Counter()
self.errors_by_service: Dict[str, List[str]] = defaultdict(list)
self.parse_failures: List[Dict[str, Any]] = []

def _record_parse_failure(self, filepath: str, line_number: int, parser_type: str, error_msg: str):
self.parse_failures.append({
"file": filepath,
"line": line_number,
"parser": parser_type,
"error": error_msg
})

def export_parse_error_report(self, output_path: str):
report = {
"total_failures": len(self.parse_failures),
"failures": self.parse_failures
}
with open(output_path, 'w') as f:
json.dump(report, f, indent=2)
logger.info(f"Parse error report exported to {output_path}")

def process_file(self, filepath: str) -> int:
parsed_count = 0
try:
if filepath.endswith('.gz'):
with gzip.open(filepath, 'rt', errors='replace') as f:
for line in f:
if self._parse_line(line):
for line_num, line in enumerate(f, 1):
success, err_parser, err_msg = self._parse_line(line)
if success:
parsed_count += 1
elif err_msg:
self._record_parse_failure(filepath, line_num, err_parser, err_msg)
else:
with open(filepath, 'r', errors='replace') as f:
for line in f:
if self._parse_line(line):
for line_num, line in enumerate(f, 1):
success, err_parser, err_msg = self._parse_line(line)
if success:
parsed_count += 1
elif err_msg:
self._record_parse_failure(filepath, line_num, err_parser, err_msg)
except Exception as e:
logger.error(f"Error processing {filepath}: {e}")

Expand All @@ -240,27 +267,30 @@ def process_directory(self, dirpath: str, pattern: str = "*.log") -> int:
logger.debug(f" {filepath.name}: {count} entries")
return total

def _parse_line(self, line: str) -> bool:
def _parse_line(self, line: str) -> Tuple[bool, Optional[str], Optional[str]]:
for parser in self.parsers:
entry = parser.parse(line)
if entry:
self.entries.append(entry)
ts = entry.get('timestamp')
if ts:
hour = datetime.fromtimestamp(ts, tz=timezone.utc).strftime('%Y-%m-%dT%H:00')
self.hourly_counts[hour] += 1
level = entry.get('level', 'unknown').lower()
self.level_counts[level] += 1
service = entry.get('service', 'unknown')
self.service_counts[service] += 1
if level in ('error', 'critical'):
msg = entry.get('message', '')
if len(msg) > 200:
msg = msg[:200]
self.errors_by_service[service].append(msg)
self.error_patterns[msg] += 1
return True
return False
try:
entry = parser.parse(line)
if entry:
self.entries.append(entry)
ts = entry.get('timestamp')
if ts:
hour = datetime.fromtimestamp(ts, tz=timezone.utc).strftime('%Y-%m-%dT%H:00')
self.hourly_counts[hour] += 1
level = entry.get('level', 'unknown').lower()
self.level_counts[level] += 1
service = entry.get('service', 'unknown')
self.service_counts[service] += 1
if level in ('error', 'critical'):
msg = entry.get('message', '')
if len(msg) > 200:
msg = msg[:200]
self.errors_by_service[service].append(msg)
self.error_patterns[msg] += 1
return True, None, None
except ValueError as e:
return False, parser.__class__.__name__, str(e)
return False, None, None

def get_summary(self) -> Dict[str, Any]:
return {
Expand Down Expand Up @@ -411,6 +441,7 @@ def parse_args():
parser.add_argument("--output", "-o", default="log_report.json", help="Output file path")
parser.add_argument("--format", choices=["json", "csv", "html"], default="json", help="Output format")
parser.add_argument("--search", help="Search for a string in logs")
parser.add_argument("--parse-error-report", help="Output path for sanitized parse-error report")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
return parser.parse_args()

Expand Down Expand Up @@ -459,6 +490,9 @@ def main():
else:
aggregator.export_json(args.output)

if args.parse_error_report:
aggregator.export_parse_error_report(args.parse_error_report)

return 0


Expand Down