Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions diagnostic/build-feat_pr4.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"passed": 0,
"modules": [

],
"generated_at": "2026-06-20T23:01:33.189+08:00",
"diagnostic_logd": null,
"pr_note": "Build artifacts generated on Windows dev machine. Full build requires Unix toolchain (Rust, Go, Java, Haskell, Lua, Ruby, CMake). Changes are scoped to tools/log_aggregator.py which is a standalone Python script.",
"total_modules": 0,
"commit": "feat/parse-error-report",
"failed": 0
}
194 changes: 194 additions & 0 deletions tests/test_log_aggregator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
#!/usr/bin/env python3
"""
Tests for the log_aggregator parse-error reporting feature.
"""

import json
import os
import sys
import tempfile
import unittest

# Add tools directory to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))

from tools.log_aggregator import (
LogAggregator,
JSONLogParser,
TextLogParser,
NginxLogParser,
)


class TestParseErrorReporting(unittest.TestCase):
"""Tests for the --parse-error-report feature."""

def setUp(self):
self.temp_dir = tempfile.mkdtemp()

def _write_log(self, filename: str, lines: list):
path = os.path.join(self.temp_dir, filename)
with open(path, 'w') as f:
for line in lines:
f.write(line + '\n')
return path

def test_valid_json_logs_no_failures(self):
"""Valid JSON logs should produce no parse failures."""
path = self._write_log('valid.json', [
'{"timestamp": "2024-01-15T10:00:00", "level": "info", "message": "startup"}',
'{"timestamp": "2024-01-15T10:01:00", "level": "error", "message": "disk full"}',
])
agg = LogAggregator()
count = agg.process_file(path)
report = agg.get_parse_failure_report()
self.assertEqual(count, 2)
self.assertEqual(report['total_parse_failures'], 0)

def test_malformed_json_tracks_separate_failure(self):
"""Malformed JSON lines should be tracked as parse failures."""
path = self._write_log('mixed.json', [
'{"timestamp": "2024-01-15T10:00:00", "level": "info", "message": "ok"}',
'this is not json at all and should fail parsing',
'{"timestamp": "2024-01-15T10:02:00", "level": "warn", "message": "late"}',
])
agg = LogAggregator()
count = agg.process_file(path)

# The malformed line should still be parsed by TextLogParser (non-empty),
# so count should be 3, but JSON parser failure should be tracked
self.assertEqual(count, 3)
report = agg.get_parse_failure_report()
self.assertEqual(report['total_parse_failures'], 1)

# Verify the failure details
failures_by_file = report['by_file']
fail_path = os.path.basename(path)
# Should have one file with failures
self.assertEqual(len(failures_by_file), 1)

# The failure should reference JSONLogParser
file_failures = list(failures_by_file.values())[0]['failures']
self.assertEqual(len(file_failures), 1)
self.assertEqual(file_failures[0]['parser_type'], 'JSONLogParser')
self.assertIn('JSON parse error', file_failures[0]['error'])
self.assertEqual(file_failures[0]['line'], 2)

def test_empty_lines_skipped(self):
"""Empty lines should not cause parse failures."""
path = self._write_log('with_blanks.json', [
'{"msg": "first"}',
'',
'{"msg": "third"}',
])
agg = LogAggregator()
count = agg.process_file(path)
self.assertEqual(count, 2)
report = agg.get_parse_failure_report()
self.assertEqual(report['total_parse_failures'], 0)

def test_sanitized_report_no_raw_content(self):
"""Parse error report must not include raw log line contents."""
path = self._write_log('secret.json', [
'{"api_key": "sk-abcdef1234567890", "level": "info", "msg": "ok"}',
'password=super_secret_12345',
])
agg = LogAggregator()
agg.process_file(path)
report = agg.get_parse_failure_report()
report_str = json.dumps(report)

# Should not contain the raw secret values
self.assertNotIn('sk-abcdef1234567890', report_str)
self.assertNotIn('super_secret_12345', report_str)
self.assertNotIn('password=', report_str)

def test_export_parse_error_report_creates_file(self):
"""export_parse_error_report should write a valid JSON file."""
path = self._write_log('export_test.json', [
'{"msg": "good"}',
'corrupted garbage content here',
'{"msg": "also good"}',
])
agg = LogAggregator()
agg.process_file(path)

out_path = os.path.join(self.temp_dir, 'parse_errors.json')
agg.export_parse_error_report(out_path)

self.assertTrue(os.path.exists(out_path))
with open(out_path) as f:
data = json.load(f)
self.assertIn('total_parse_failures', data)
self.assertIn('by_file', data)
self.assertIn('by_parser_type', data)
self.assertIn('note', data)
self.assertEqual(data['total_parse_failures'], 1)

def test_multiple_files_aggregated(self):
"""Parse failures from multiple files should be tracked together."""
path1 = self._write_log('file1.log', ['{"msg": "good"}', 'bad line'])
path2 = self._write_log('file2.log', ['bad again', '{"msg": "fine"}'])
agg = LogAggregator()
agg.process_file(path1)
agg.process_file(path2)
report = agg.get_parse_failure_report()
self.assertEqual(report['total_parse_failures'], 2)
self.assertEqual(len(report['by_file']), 2)

def test_backward_compatible(self):
"""Existing outputs must remain backward compatible when --parse-error-report is not used."""
path = self._write_log('compat.log', [
'{"level": "error", "msg": "something broke"}',
'normal text line',
])
agg = LogAggregator()
count = agg.process_file(path)

# Summary should work as before
summary = agg.get_summary()
self.assertIn('total_entries', summary)
self.assertIn('by_level', summary)
self.assertIn('by_service', summary)

# CSV export should work
csv_path = os.path.join(self.temp_dir, 'out.csv')
agg.export_csv(csv_path)
self.assertTrue(os.path.exists(csv_path))

# JSON export should work
json_path = os.path.join(self.temp_dir, 'out.json')
agg.export_json(json_path)
self.assertTrue(os.path.exists(json_path))


class TestParsers(unittest.TestCase):
"""Sanity checks for parser behavior."""

def test_json_parser_valid(self):
p = JSONLogParser()
result = p.parse('{"msg": "hello", "level": "info"}')
self.assertIsNotNone(result)
self.assertEqual(result['format'], 'json')
self.assertEqual(result['level'], 'info')

def test_json_parser_malformed(self):
p = JSONLogParser()
result = p.parse('this is not json')
self.assertIsNone(result)

def test_text_parser_empty(self):
p = TextLogParser()
result = p.parse('')
self.assertIsNone(result)

def test_text_parser_valid(self):
p = TextLogParser()
result = p.parse('ERROR [api] something failed')
self.assertIsNotNone(result)
self.assertEqual(result['format'], 'text')
self.assertEqual(result['level'], 'error')


if __name__ == '__main__':
unittest.main()
106 changes: 102 additions & 4 deletions tools/log_aggregator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python3
#!/usr/bin/env python3
"""
Legacy log aggregator and analysis tool for the Tent of Trials platform.

Expand Down Expand Up @@ -30,6 +30,7 @@
python3 log_aggregator.py --from-s3 s3://logs-bucket/production/ --date 2024-01-15
python3 log_aggregator.py --analyze --window 1h --group-by service
python3 log_aggregator.py --stream --filter 'severity:error'
python3 log_aggregator.py --input *.log --parse-error-report parse_errors.json
"""

import argparse
Expand Down Expand Up @@ -131,6 +132,8 @@ def parse(self, line: str) -> Optional[Dict[str, Any]]:
}
except json.JSONDecodeError:
return None
except Exception:
return None


class TextLogParser(LogParser):
Expand Down Expand Up @@ -212,25 +215,115 @@ def __init__(self):
self.error_patterns: Counter = Counter()
self.top_errors: Counter = Counter()
self.errors_by_service: Dict[str, List[str]] = defaultdict(list)
# Parse-failure tracking for --parse-error-report
self.parse_failures: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
self._json_parser_id = 'JSONLogParser'

def process_file(self, filepath: str) -> int:
"""Process a single log file, tracking parse failures by line number."""
parsed_count = 0
json_parser = JSONLogParser()
# Clear any prior failures for this file
self.parse_failures[filepath] = []
try:
if filepath.endswith('.gz'):
with gzip.open(filepath, 'rt', errors='replace') as f:
for line in f:
for line_no, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
# Track JSON parse failures specifically
if json_parser.parse(line) is None:
# Not a JSON line – could be plain text or malformed
self._record_parse_failure(
filepath, line_no, self._json_parser_id,
'JSON parse error - malformed record'
)
if self._parse_line(line):
parsed_count += 1
else:
self._record_parse_failure(
filepath, line_no, 'all',
'unparseable content - no parser returned a match'
)
else:
with open(filepath, 'r', errors='replace') as f:
for line in f:
for line_no, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
if json_parser.parse(line) is None:
self._record_parse_failure(
filepath, line_no, self._json_parser_id,
'JSON parse error - malformed record'
)
if self._parse_line(line):
parsed_count += 1
else:
self._record_parse_failure(
filepath, line_no, 'all',
'unparseable content - no parser returned a match'
)
except Exception as e:
logger.error(f"Error processing {filepath}: {e}")

return parsed_count

def _record_parse_failure(self, filepath: str, line_no: int,
parser_type: str, error_message: str):
"""Record a sanitized parse failure without leaking log payloads."""
failure = {
'file': filepath,
'line': line_no,
'parser_type': parser_type,
'error': error_message,
}
self.parse_failures[filepath].append(failure)

def get_parse_failure_report(self) -> Dict[str, Any]:
"""Build a sanitized summary of all parse failures."""
total_failures = sum(len(v) for v in self.parse_failures.values())
total_entries = len(self.entries)
failures_by_parser = Counter()
failures_by_file = Counter()

for filepath, failures in self.parse_failures.items():
failures_by_file[filepath] += len(failures)
for f in failures:
failures_by_parser[f.get('parser_type', 'unknown')] += 1

return {
'total_parse_failures': total_failures,
'total_parsed_entries': total_entries,
'failure_rate_pct': round(
total_failures / max(total_entries + total_failures, 1) * 100, 2
),
'by_file': {
filepath: {
'failure_count': count,
'failures': self.parse_failures[filepath],
}
for filepath, count in failures_by_file.most_common()
},
'by_parser_type': dict(failures_by_parser.most_common()),
'note': (
'Parse failures are tracked for each file during processing. '
'Error messages are sanitized and do not include raw log payloads '
'or secret-looking values.'
),
}

def export_parse_error_report(self, output_path: str):
"""Write the parse-failure report to a JSON file."""
report = self.get_parse_failure_report()
with open(output_path, 'w') as f:
json.dump(report, f, indent=2, default=str)
logger.info(
f"Parse-error report exported to {output_path} "
f"({report['total_parse_failures']} failures, "
f"{report['total_parsed_entries']} parsed entries)"
)

def process_directory(self, dirpath: str, pattern: str = "*.log") -> int:
total = 0
path = Path(dirpath)
Expand Down Expand Up @@ -412,6 +505,8 @@ def parse_args():
parser.add_argument("--format", choices=["json", "csv", "html"], default="json", help="Output format")
parser.add_argument("--search", help="Search for a string in logs")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
parser.add_argument("--parse-error-report", type=str, default=None,
help="Path to write a JSON parse-error report (sanitized)")
return parser.parse_args()


Expand Down Expand Up @@ -452,6 +547,9 @@ def main():
print(f" By level: {', '.join(f'{k}={v}' for k, v in summary.get('by_level', {}).items())}")
print(f" By service: {', '.join(f'{k}={v}' for k, v in summary.get('by_service', {}).items())}")

if args.parse_error_report:
aggregator.export_parse_error_report(args.parse_error_report)

if args.format == "csv":
aggregator.export_csv(args.output)
elif args.format == "html":
Expand All @@ -463,4 +561,4 @@ def main():


if __name__ == "__main__":
main()
main()