Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.venv
test_logs.txt
apache_logs.db
.ipynb_checkpoints
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# 📊 LogForge Apache Log ETL

## 📁 Overview
LogForge is an Apache log parser and analyzer pipeline that extracts, parses, stores, and summarizes log data using Python and SQLite.

---

## ⚙️ Project Structure

- `etl_apache.py` – CLI runner for the ETL pipeline
- `parser.py` – Parses raw log lines using regex
- `database.py` – Manages DB schema, connection, and insert logic
- `summerizer.py` – Generates daily summary CSVs
- `apache_logs.db` – Local SQLite DB
- `data/logs/apache_logs` – Folder containing raw logs

---

## 🔍 Schema
![alt text](image.png)

## 🚀 Usage

```bash
python etl_apache.py --log data/logs/apache_logs
Binary file added __pycache__/database.cpython-312.pyc
Binary file not shown.
Binary file added __pycache__/parser.cpython-312.pyc
Binary file not shown.
69 changes: 68 additions & 1 deletion database.py
Original file line number Diff line number Diff line change
@@ -1 +1,68 @@
# Contains DB Connection and Schema, and insertion logic
import sqlite3

def connect_db(db_path="apache_logs.db"):
return sqlite3.connect(db_path)

def create_tables(conn):
cursor = conn.cursor()

cursor.execute("""
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ip TEXT,
timestamp TEXT,
method TEXT,
path TEXT,
protocol TEXT,
status INTEGER,
bytes INTEGER,
referrer TEXT,
user_agent TEXT,
signature_hash TEXT UNIQUE
)
""")

cursor.execute("""
CREATE TABLE IF NOT EXISTS errors (
id INTEGER PRIMARY KEY AUTOINCREMENT,
raw_line TEXT,
error_reason TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""")

conn.commit()

def insert_logs(conn, logs_df):
cursor = conn.cursor()
for _, row in logs_df.iterrows():
try:
cursor.execute("""
INSERT INTO logs (
ip, timestamp, method, path, protocol, status,
bytes, referrer, user_agent, signature_hash
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
row["ip"],
row["timestamp"],
row["method"],
row["path"],
row["protocol"],
row["status"],
row["bytes_sent"], # this maps to `bytes` in DB
row["referrer"],
row["user_agent"],
row["signature_hash"]
))
except sqlite3.IntegrityError:
continue # Skip duplicates
conn.commit()

def insert_errors(conn, errors_df):
cursor = conn.cursor()
for _, row in errors_df.iterrows():
cursor.execute("""
INSERT INTO errors (raw_line, error_reason)
VALUES (?, ?)
""", (row["raw_log"], row["error_reason"]))
conn.commit()
51 changes: 50 additions & 1 deletion etl_apache.py
Original file line number Diff line number Diff line change
@@ -1 +1,50 @@
# Entry-point CLI
# etl_apache.py — Entry-point CLI tool for ETL
import argparse
import pandas as pd
from parser import transform_logs
from database import connect_db, create_tables, insert_logs, insert_errors
import chardet

def detect_encoding(file_path):
with open(file_path, 'rb') as f:
raw = f.read(10000) # sample
return chardet.detect(raw)['encoding']

def extract_logs(file_path):
"""Reads raw log lines into a DataFrame."""
encoding = detect_encoding(file_path)
with open(file_path, 'r', encoding=encoding,errors='replace' ) as f:
lines = f.readlines()
return pd.DataFrame({'raw_log': [line.strip() for line in lines]})

def run_etl(log_path):
print("🔄 Starting ETL pipeline...")

raw_df = extract_logs(log_path)

# Connect to SQLite and set up schema
conn = connect_db()
create_tables(conn)

# Transform logs
cleaned_df, malformed_df = transform_logs(raw_df)

# Load valid logs
if not cleaned_df.empty:
insert_logs(conn, cleaned_df)
print(f"✅ Inserted {len(cleaned_df)} valid logs.")

# Load errors
if not malformed_df.empty:
insert_errors(conn, malformed_df)
print(f"⚠️ Logged {len(malformed_df)} malformed lines.")

conn.close()
print(" ETL process completed.")

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run Apache Log ETL")
parser.add_argument('--log', type=str, required=True, help='Path to Apache log file')
args = parser.parse_args()

run_etl(args.log)
Binary file added image.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
61 changes: 60 additions & 1 deletion parser.py
Original file line number Diff line number Diff line change
@@ -1 +1,60 @@
# Log Parsing Logic
import re
import pandas as pd
import hashlib
from datetime import datetime

# regex pattern
log_pattern = re.compile(
r'(?P<ip>\d+\.\d+\.\d+\.\d+)\s-\s-\s'
r'\[(?P<timestamp>[^\]]+)\]\s'
r'"(?P<method>\w+)\s(?P<path>.*?)\s(?P<protocol>HTTP/\d\.\d)"\s'
r'(?P<status>\d{3})\s(?P<bytes>\d+)\s'
r'"(?P<referrer>[^"]*)"\s'
r'"(?P<user_agent>[^"]*)"'
)

# Parse timestamp
def parse_timestamp(raw: str) -> str | None:
try:
return datetime.strptime(raw, "%d/%b/%Y:%H:%M:%S %z").isoformat()
except Exception:
return None

# Generate unique hash for deduplication
def generate_hash(ip: str, timestamp: str, path: str) -> str:
return hashlib.md5(f"{ip}_{timestamp}_{path}".encode()).hexdigest()

# Main transform function
def transform_logs(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
structured_logs = []
malformed_logs = []

for raw in df["raw_log"]:
raw = raw.strip()
match = log_pattern.match(raw)
if not match:
malformed_logs.append({"raw_log": raw, "error_reason": "Regex match failed"})
continue

data = match.groupdict()
timestamp = parse_timestamp(data["timestamp"])
if not timestamp:
malformed_logs.append({"raw_log": raw, "error_reason": "Invalid timestamp"})
continue

uid = generate_hash(data["ip"], timestamp, data["path"])

structured_logs.append({
"signature_hash": uid,
"ip": data["ip"],
"timestamp": timestamp,
"method": data["method"],
"path": data["path"],
"protocol": data["protocol"],
"status": int(data["status"]),
"bytes_sent": int(data["bytes"]),
"referrer": data["referrer"],
"user_agent": data["user_agent"]
})

return pd.DataFrame(structured_logs), pd.DataFrame(malformed_logs)
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pandas
chardet
24 changes: 23 additions & 1 deletion summarizer.py
Original file line number Diff line number Diff line change
@@ -1 +1,23 @@
# Report Generation
# summarizer.py
import sqlite3
import pandas as pd
import json

def generate_daily_summary(date_str):
conn = sqlite3.connect("apache_logs.db")
query = f"""
SELECT date(timestamp) AS log_date, status, COUNT(*) AS hits
FROM logs
WHERE date(timestamp) = '{date_str}'
GROUP BY status
ORDER BY status
"""
df = pd.read_sql_query(query, conn)

# Save as JSON instead of CSV
output_filename = f"summary_{date_str}.json"
with open(output_filename, 'w', encoding='utf-8') as f:
json.dump(df.to_dict(orient='records'), f, indent=2)

print(f"📦 Daily summary saved as {output_filename}")
conn.close()