-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathparser.py
More file actions
75 lines (63 loc) · 2.68 KB
/
parser.py
File metadata and controls
75 lines (63 loc) · 2.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# Log Parsing Logic
import json
import re
import os
def validate_source(log_source):
"""
Validates the log source input (This is done to enable backward compatibility)
"""
if isinstance(log_source, str) and os.path.isfile(log_source) and log_source.lower().endswith('.json'):
with open(log_source, 'r') as file:
try:
data = json.load(file)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON file: {e}")
if isinstance(data, dict):
if data.get('raw_logs'):
raw_logs = data['raw_logs']
if isinstance(raw_logs, list):
log_content = "\n".join(str(line) for line in raw_logs)
elif isinstance(raw_logs, str):
log_content = raw_logs.replace('\r\n', '\n')
else:
raise ValueError("Invalid 'raw_logs' format")
else:
raise ValueError("No 'raw_logs' key found in JSON file")
else:
raise ValueError("Invalid JSON structure")
log_content = log_source
return log_content
def parse_log(log_source):
"""
Parses the log content and extracts relevant information.
Args:
log_content (str): The content of the log file.
Returns:
list: A list of dictionaries containing parsed log entries.
"""
log_entries = []
error_entries = []
log_content = validate_source(log_source)
# Regular expression to match log entries
log_pattern = re.compile(
r'^(?P<ipaddress>\d{1,3}(?:\.\d{1,3}){3}) \- - \[(?P<timestamp>[^\]]+)\] "(?P<method>[A-Z]+) (?P<path>[^"]+) (?P<protocol>[^"]+)" (?P<status_code>\d{3}) (?P<bytes_sent>\d+) "(?P<referrer>[^"]+)" "(?P<user_agent>[^"]+)"$',
re.MULTILINE
)
for line in log_content.splitlines():
match = log_pattern.match(line)
if match:
entry = {
'ipaddress': match.group('ipaddress'),
'timestamp': match.group('timestamp'),
'method': match.group('method'),
'path': match.group('path'),
'protocol': match.group('protocol'),
'status_code': match.group('status_code'),
'bytes_sent': match.group('bytes_sent'),
'referrer': match.group('referrer'),
'user_agent': match.group('user_agent')
}
log_entries.append(entry)
else:
error_entries.append(line)
return log_entries, error_entries