-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcli.py
More file actions
278 lines (225 loc) · 12.3 KB
/
cli.py
File metadata and controls
278 lines (225 loc) · 12.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
"""
NetForensics/cli.py
────────────────────
Command-line interface for the NetForensics packet sniffer.
Usage examples:
sudo python cli.py --interface eth0 --duration 30
sudo python cli.py --interface eth0 --filter "tcp port 80" --output-dir ./captures --case CASE-001
sudo python cli.py --interface eth0 --count 500 --format json,html --operator "J. Smith"
Author : Ahmad Raza
License : MIT
"""
import argparse
import os
import signal
import sys
import time
from datetime import datetime
# ── Ensure project root is on sys.path ────────────────────────────────────────
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from core.sniffer import PacketRecord, PacketSniffer
from reports.generator import export_csv, export_html, export_json, export_txt
# ── ANSI colour helpers ───────────────────────────────────────────────────────
def _c(code: str, text: str) -> str:
"""Wrap text in an ANSI colour if stdout is a tty."""
if sys.stdout.isatty():
return f"\033[{code}m{text}\033[0m"
return text
def red(t): return _c("31", t)
def green(t): return _c("32", t)
def yellow(t): return _c("33", t)
def cyan(t): return _c("36", t)
def bold(t): return _c("1", t)
def dim(t): return _c("2", t)
# ── Live packet printer ───────────────────────────────────────────────────────
def _live_print(record: PacketRecord):
"""Print a one-line summary of each captured packet to stdout."""
risk = record.risk_score
if risk >= 75:
risk_str = red(f"[CRIT {risk:>3}]")
elif risk >= 50:
risk_str = yellow(f"[HIGH {risk:>3}]")
elif risk >= 25:
risk_str = yellow(f"[MED {risk:>3}]")
else:
risk_str = green(f"[LOW {risk:>3}]")
src = f"{record.src_ip or '?'}:{record.src_port or ''}"
dst = f"{record.dst_ip or '?'}:{record.dst_port or ''}"
proto = cyan(f"{record.protocol:<6}")
flags = f" [{record.tcp_flags}]" if record.tcp_flags else ""
dns = f" DNS?{record.dns_query}" if record.dns_query else ""
anomaly = red(f" ⚠ {'; '.join(record.anomalies)}") if record.anomalies else ""
print(
f" #{record.packet_id:<6} {dim(record.capture_timestamp_utc[-15:])} "
f"{proto} {src:>22} → {dst:<22} {risk_str}{flags}{dns}{anomaly}"
)
# ── Banner ────────────────────────────────────────────────────────────────────
BANNER = r"""
███╗ ██╗███████╗████████╗███████╗ ██████╗ ██████╗ ███████╗
████╗ ██║██╔════╝╚══██╔══╝██╔════╝██╔═══██╗██╔══██╗██╔════╝
██╔██╗ ██║█████╗ ██║ █████╗ ██║ ██║██████╔╝███████╗
██║╚██╗██║██╔══╝ ██║ ██╔══╝ ██║ ██║██╔══██╗╚════██║
██║ ╚████║███████╗ ██║ ██║ ╚██████╔╝██║ ██║███████║
╚═╝ ╚═══╝╚══════╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝╚══════╝
Forensic Packet Intelligence Platform v1.0.0
"""
# ── Argument parser ───────────────────────────────────────────────────────────
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
prog="netforensics",
description="NetForensics – Forensic packet sniffer with court-ready reporting.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
sudo python cli.py -i eth0 -d 60
sudo python cli.py -i eth0 -f "tcp port 443" --count 1000 --case CASE-2024-001
sudo python cli.py -i eth0 -d 120 --format json,html --operator "Jane Doe" --output-dir ./evidence
""",
)
# Capture settings
cap = p.add_argument_group("Capture")
cap.add_argument("-i", "--interface", default=None,
help="Network interface (default: Scapy auto-select)")
cap.add_argument("-f", "--filter", dest="bpf_filter", default="",
metavar="BPF",
help="BPF capture filter e.g. 'tcp port 80'")
cap.add_argument("-c", "--count", type=int, default=0,
help="Stop after N packets (default: 0 = unlimited)")
cap.add_argument("-d", "--duration", type=float, default=0,
help="Stop after N seconds (default: 0 = until Ctrl-C)")
# Report settings
rep = p.add_argument_group("Report")
rep.add_argument("--format", default="json,txt",
help="Comma-separated output formats: json,csv,txt,html (default: json,txt)")
rep.add_argument("--output-dir", default="./netforensics_output",
metavar="DIR",
help="Directory for report files (default: ./netforensics_output)")
rep.add_argument("--case", default="N/A", metavar="ID",
help="Case / evidence identifier")
rep.add_argument("--operator", default="Unknown",
help="Name of the capturing analyst")
rep.add_argument("--notes", default="",
help="Free-text notes appended to the report")
# Display
disp = p.add_argument_group("Display")
disp.add_argument("-q", "--quiet", action="store_true",
help="Suppress per-packet output (only print summary)")
disp.add_argument("--no-banner", action="store_true",
help="Suppress the ASCII banner")
return p
# ── Main entry point ──────────────────────────────────────────────────────────
def main():
parser = build_parser()
args = parser.parse_args()
if not args.no_banner:
print(cyan(BANNER))
# ── Privilege warning ─────────────────────────────────────────────────────
if os.name != "nt" and os.geteuid() != 0:
print(yellow(" ⚠ Warning: packet capture typically requires root privileges."))
print(yellow(" Run with sudo or as Administrator.\n"))
# ── Resolve output formats ────────────────────────────────────────────────
formats = {f.strip().lower() for f in args.format.split(",")}
valid_formats = {"json", "csv", "txt", "html"}
invalid = formats - valid_formats
if invalid:
print(red(f" ✗ Unknown format(s): {', '.join(invalid)}"))
print(f" Valid options: {', '.join(sorted(valid_formats))}")
sys.exit(1)
# ── Print session configuration ───────────────────────────────────────────
print(bold(" SESSION CONFIGURATION"))
print(f" Interface : {args.interface or 'auto'}")
print(f" BPF Filter : {args.bpf_filter or 'none'}")
print(f" Pkt limit : {args.count or 'unlimited'}")
print(f" Duration : {args.duration or 'unlimited'} s")
print(f" Formats : {', '.join(sorted(formats))}")
print(f" Output Dir : {args.output_dir}")
print(f" Case ID : {args.case}")
print(f" Operator : {args.operator}")
print()
# ── Set up sniffer ────────────────────────────────────────────────────────
callback = None if args.quiet else _live_print
try:
sniffer = PacketSniffer(
interface=args.interface,
bpf_filter=args.bpf_filter,
packet_count=args.count,
callback=callback,
)
except RuntimeError as e:
print(red(f" ✗ {e}"))
sys.exit(1)
if not args.quiet:
print(dim(" #ID Timestamp Proto Source → Destination [Risk]"))
print(dim(" " + "─" * 98))
# ── Graceful SIGINT handler ───────────────────────────────────────────────
def _sigint_handler(sig, frame):
print(f"\n{yellow(' ⏹ Interrupt received — stopping capture...')}")
sniffer.stop()
signal.signal(signal.SIGINT, _sigint_handler)
# ── Start capture ─────────────────────────────────────────────────────────
print(green(" ▶ Capture started. Press Ctrl-C to stop.\n"))
sniffer.start()
try:
if args.duration > 0:
time.sleep(args.duration)
sniffer.stop()
else:
while sniffer.is_running():
time.sleep(0.5)
except KeyboardInterrupt:
sniffer.stop()
# ── Collect results ───────────────────────────────────────────────────────
records = sniffer.get_records()
stats = sniffer.get_statistics()
print()
print(bold(" CAPTURE SUMMARY"))
print(f" Packets captured : {len(records)}")
print(f" Duration : {stats.get('duration_seconds', 0)} s")
print(f" Throughput : {stats.get('packets_per_second', 0)} pkts/s")
risk = stats.get("risk_distribution", {})
print(f" Risk → Critical: {red(str(risk.get('critical',0)))} "
f"High: {yellow(str(risk.get('high',0)))} "
f"Medium: {yellow(str(risk.get('medium',0)))} "
f"Low: {green(str(risk.get('low',0)))}")
anomalies = stats.get("anomalies", [])
if anomalies:
print(f"\n {red('⚠')} {len(anomalies)} anomalies detected:")
for a in anomalies[:10]:
print(f" Pkt #{a['packet_id']:>6} {a.get('src_ip','?')} → {a.get('dst_ip','?')} "
f"Risk:{a['risk_score']:>3} {a['anomaly']}")
if len(anomalies) > 10:
print(f" … and {len(anomalies) - 10} more (see report)")
# ── Export reports ────────────────────────────────────────────────────────
if not records:
print(yellow("\n No packets captured — no report generated."))
return
os.makedirs(args.output_dir, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
base = os.path.join(args.output_dir, f"netforensics_{ts}")
print(f"\n{bold(' EXPORTING REPORTS')}")
export_kwargs = dict(
records=records,
stats=stats,
operator=args.operator,
case_id=args.case,
notes=args.notes,
)
if "json" in formats:
path = f"{base}.json"
export_json(**export_kwargs, output_path=path)
print(f" {green('✓')} JSON : {path}")
if "csv" in formats:
path = f"{base}.csv"
export_csv(records=records, output_path=path)
print(f" {green('✓')} CSV : {path}")
if "txt" in formats:
path = f"{base}.txt"
export_txt(**export_kwargs, output_path=path)
print(f" {green('✓')} TXT : {path}")
if "html" in formats:
path = f"{base}.html"
export_html(**export_kwargs, output_path=path)
print(f" {green('✓')} HTML : {path}")
print(f"\n{green(' ✓ Done.')} Reports written to {bold(args.output_dir)}\n")
if __name__ == "__main__":
main()