-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjava_audit.py
More file actions
296 lines (250 loc) · 11.4 KB
/
Copy pathjava_audit.py
File metadata and controls
296 lines (250 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
#!/usr/bin/env python3
"""
============================================================
Java 源码离线审计工具 (Java Source Audit Tool)
============================================================
适用于 100MB+ 反编译 Java 源码包的自动化审计工具。
纯离线运行,不依赖 AI 大模型,输出 Markdown / JSON / 终端报告。
用法:
python3 java_audit.py <源码目录> # 终端摘要
python3 java_audit.py <源码目录> -o report.md # Markdown 报告
python3 java_audit.py <源码目录> -o report.json # JSON 报告
python3 java_audit.py <源码目录> --config-dir <配置目录> # 配置文件分析
python3 java_audit.py <源码目录> --project-only # 只看项目代码
python3 java_audit.py <源码目录> --attack-surface # 只看攻击面
python3 java_audit.py <源码目录> --workers 16 # 多线程加速
"""
import os
import sys
import time
import argparse
from pathlib import Path
# 将当前目录加入 sys.path 以保证模块导入正常
_THIS_DIR = Path(__file__).resolve().parent
if str(_THIS_DIR) not in sys.path:
sys.path.insert(0, str(_THIS_DIR))
from lib.scanner import Scanner
from lib.parser import JavaParser
from lib.classifier import PackageClassifier
from lib.analyzer import Analyzer
from lib.cve_matcher import CVEMatcher
from lib.exploit_chain import ExploitChainAnalyzer
from lib.reporter import MarkdownReporter, TerminalReporter, JSONReporter, CVEMatcher_dedup
from lib.models import AuditReport
class AuditEngine:
"""
审计引擎 —— 编排各模块,不包含业务逻辑。
职责:
1. 创建各模块实例
2. 按顺序调用各模块
3. 将结果聚合到 AuditReport 中
"""
def __init__(self, source_dir: str, config_dir: str = None,
workers: int = 4):
self.source_dir = source_dir
self.config_dir = config_dir or source_dir
self.workers = workers
# 模块实例
self.scanner = Scanner(source_dir, self.config_dir)
self.parser = JavaParser()
self.classifier = PackageClassifier()
self.analyzer = Analyzer()
def run(self) -> AuditReport:
"""执行完整审计流程,返回 AuditReport"""
report = AuditReport()
report.scan_time = time.strftime("%Y-%m-%d %H:%M:%S")
report.source_dir = str(Path(self.source_dir).absolute())
report.config_dir = str(Path(self.config_dir).absolute())
t0 = time.time()
print(f"\n[*] 开始扫描: {self.source_dir}")
print(f"[*] 配置目录: {self.config_dir}")
# ===== 阶段1: 扫描文件 =====
print("[*] [1/5] 扫描文件...")
java_paths, total, total_mb = self.scanner.scan_java_files()
report.total_files = total
report.total_size_mb = total_mb
print(f" 找到 {total} 个 Java 文件,共 {total_mb:.1f} MB")
if total == 0:
print("[!] 未找到 Java 源文件")
return report
# ===== 阶段2: 解析文件 =====
print(f"[*] [2/5] 解析 Java 文件...")
parsed = self.parser.parse_batch(
java_paths,
workers=self.workers,
progress_cb=lambda done, all_: print(
f" 进度: {done}/{all_}", end='\r') if done % 500 == 0 else None
)
print(f" 成功解析: {len(parsed)}/{total}")
if not parsed:
return report
# ===== 阶段3: 分类包 =====
print("[*] [3/5] 分类包(项目代码 vs 第三方库)...")
self.classifier.classify_all(parsed)
# ===== 阶段4: 安全分析 =====
print("[*] [4/5] 安全分析...")
project_files = self.classifier.get_project_files(parsed)
# 运行时组件
report.components = self.analyzer.analyze_components(project_files)
# 攻击面
attack_result = self.analyzer.analyze_attack_surface(project_files)
report.attack_surface = attack_result["attack_files"]
report.entry_points = attack_result["entry_points"]
# 高风险文件
report.high_risk_files = self.analyzer.get_high_risk_files(project_files)
# 配置风险
config_files = self.scanner.scan_config_files()
report.config_issues = self.analyzer.analyze_config_risks(config_files)
# CVE 匹配
cve_matches = []
# 从 jar 文件匹配
lib_dir = self.scanner.find_lib_dir()
if lib_dir:
cve_matches.extend(CVEMatcher.match_from_jars(lib_dir))
else:
print(" [i] 未找到 BOOT-INF/lib 目录")
# 从 pom.xml 匹配
pom_files = self.scanner.find_pom_files()
if pom_files:
cve_matches.extend(CVEMatcher.match_from_pom(pom_files))
else:
print(" [i] 未找到 pom.xml")
report.cve_matches = CVEMatcher.deduplicate(cve_matches)
# ===== 攻击利用链分析 =====
chain_analyzer = ExploitChainAnalyzer()
lib_names = [l.name for l in self.classifier.get_library_summary()]
# 获取 jar 文件名用于更精确的库匹配
lib_dir_for_jars = self.scanner.find_lib_dir()
jar_names = []
if lib_dir_for_jars:
jar_names = self.scanner.list_jar_files(lib_dir_for_jars)
report.exploit_chains = chain_analyzer.analyze(
cve_matches=report.cve_matches,
config_issues=report.config_issues,
components=report.components,
library_names=lib_names,
jar_files=jar_names,
)
n_exploitable = chain_analyzer.count_exploitable(report.exploit_chains)
n_partial = chain_analyzer.count_partial(report.exploit_chains)
print(f" 攻击链: {n_exploitable} 条可利用, {n_partial} 条部分满足")
# ===== API 端点提取 =====
all_files = project_files + attack_result["attack_files"]
# 对第三方库中也检测到的 controller 也尝试提取 (反编译场景)
source_endpoints = self.parser.extract_all_api_endpoints(parsed)
# 从配置文件推断框架内置端点 (Actuator / Gateway / Nacos / 路由)
inferred_endpoints = self.analyzer.infer_endpoints_from_config(
config_files, report.library_summaries
)
# 合并: 推断端点在前 (更完整), 源码端点在后
report.api_endpoints = inferred_endpoints + source_endpoints
print(f" API 端点: {len(report.api_endpoints)} 个"
f" (源码 {len(source_endpoints)} + 推断 {len(inferred_endpoints)})")
# ===== 阶段5: 填充报告数据 =====
print("[*] [5/5] 生成报告...")
report.project_files = project_files
report.library_summaries = self.classifier.get_library_summary()
elapsed = time.time() - t0
print(f"\n[+] 扫描完成,耗时 {elapsed:.1f}s")
return report
# ============================================================
# 命令行入口
# ============================================================
def main():
parser = argparse.ArgumentParser(
description="Java 源码离线审计工具 - 分析大型 Java 源码包的结构与安全风险",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python3 java_audit.py /path/to/sources
python3 java_audit.py /path/to/sources -o report.md
python3 java_audit.py /path/to/sources -o report.json
python3 java_audit.py /path/to/sources --config-dir /path/to/config
python3 java_audit.py /path/to/sources --project-only
python3 java_audit.py /path/to/sources --attack-surface --workers 16
"""
)
parser.add_argument("source_dir", help="Java 源码目录路径")
parser.add_argument("-o", "--output", help="输出文件路径(.md 为 Markdown, .json 为 JSON)")
parser.add_argument("--config-dir", help="配置文件目录 (yaml/yml/properties/xml)")
parser.add_argument("--attack-surface", action="store_true", help="仅显示攻击面速览")
parser.add_argument("--project-only", action="store_true", help="仅显示项目自身代码清单")
parser.add_argument("--workers", type=int, default=4, help="并行解析线程数(默认 4)")
parser.add_argument("--no-color", action="store_true", help="禁用彩色终端输出")
args = parser.parse_args()
if not os.path.isdir(args.source_dir):
print(f"错误: 目录不存在 - {args.source_dir}")
sys.exit(1)
# 运行引擎
engine = AuditEngine(
source_dir=args.source_dir,
config_dir=args.config_dir,
workers=args.workers,
)
report = engine.run()
# 输出
if args.project_only:
_print_project_only(report)
elif args.attack_surface:
_print_attack_surface(report)
else:
TerminalReporter.generate(report)
# 文件输出
if args.output:
out_path = args.output.lower()
if out_path.endswith('.md'):
md = MarkdownReporter.generate(report)
with open(args.output, 'w', encoding='utf-8') as f:
f.write(md)
print(f"[+] Markdown 报告已保存到: {args.output}")
elif out_path.endswith('.json'):
JSONReporter.generate(report, args.output)
print(f"[+] JSON 报告已保存到: {args.output}")
else:
# 默认生成 Markdown
md = MarkdownReporter.generate(report)
with open(args.output, 'w', encoding='utf-8') as f:
f.write(md)
print(f"[+] 报告已保存到: {args.output}")
def _print_project_only(report: AuditReport):
"""仅输出项目代码清单"""
print(f"\n{'='*60}")
print(f" 项目自身代码: {len(report.project_files)} 个文件")
print(f"{'='*60}")
for f in sorted(report.project_files, key=lambda x: x.path):
print(f"\n {f.path}")
print(f" 包: {f.package}")
print(f" 类: {f.class_name} ({f.class_type})")
anns = ', '.join(f'@{a}' for a in f.annotations) if f.annotations else '无'
print(f" 注解: {anns}")
fws = ', '.join(f.detected_frameworks) if f.detected_frameworks else '普通类'
print(f" 组件: {fws}")
if f.risk_level != "INFO":
print(f" 风险: [{f.risk_level}]")
def _print_attack_surface(report: AuditReport):
"""仅输出攻击面"""
print(f"\n{'='*60}")
print(f" 攻击面: {len(report.attack_surface)} 个暴露组件")
print(f"{'='*60}")
for f in report.attack_surface:
print(f" [{f.risk_level}] {f.class_name} - {f.detected_frameworks}")
for r in f.risk_reasons:
print(f" └─ {r}")
cves = CVEMatcher_dedup(report.cve_matches)
if cves:
print(f"\n CVE 漏洞: {len(cves)} 个")
for c in cves:
print(f" [{c.severity}] {c.cve_id} - {c.component}@{c.version}")
if report.config_issues:
print(f"\n 配置风险: {len(report.config_issues)} 项")
for i in report.config_issues:
print(f" [{i.severity}] {i.description}")
chains_exploitable = [c for c in report.exploit_chains if c.exploitable]
if chains_exploitable:
print(f"\n 可利用攻击链: {len(chains_exploitable)} 条")
for c in chains_exploitable:
print(f" [{c.severity}] {c.name}")
for step in c.exploit_steps[:3]:
print(f" └─ {step}")
if __name__ == "__main__":
main()