-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathosquery_hunter.py
More file actions
151 lines (128 loc) · 5.69 KB
/
osquery_hunter.py
File metadata and controls
151 lines (128 loc) · 5.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import subprocess, json, shutil
# Use osqueryi from PATH; you should have installed it already. Modify the path as per your installation.
OSQUERYI = shutil.which("osqueryi") or r"C:\Program Files\osquery\osqueryi.exe"
QUERY = """
SELECT
p.pid, p.name, p.path, p.parent, p.start_time,
a.subject_name AS signer, a.issuer_name AS issuer, a.result AS sig_result
FROM processes p
LEFT JOIN authenticode a ON a.path = p.path
WHERE NOT (
LOWER(a.result) IN ('trusted','valid')
AND (
LOWER(a.subject_name) LIKE '%microsoft%'
OR LOWER(a.issuer_name) LIKE '%microsoft%'
)
)
ORDER BY p.pid;
"""
SOCKETS_QUERY = """
SELECT pid, remote_address, remote_port, protocol, state
FROM process_open_sockets
WHERE remote_address != ''
ORDER BY pid, remote_address, remote_port;
"""
SUSP_PATHS = (
"\\Users\\", "\\AppData\\", "\\Temp\\", "\\ProgramData\\", "\\Windows\\Temp\\"
)
LOLBIN_NAMES = {
"rundll32.exe","regsvr32.exe","mshta.exe","wscript.exe","cscript.exe",
"powershell.exe","pwsh.exe","cmd.exe","certutil.exe","bitsadmin.exe",
"installutil.exe","msiexec.exe","odbcconf.exe","fodhelper.exe",
"dllhost.exe","scriptrunner.exe","addinutil.exe"
}
SUSP_PARENTS = {
"chrome.exe","msedge.exe","firefox.exe",
"winword.exe","excel.exe","powerpnt.exe","outlook.exe","acrord32.exe"
}
def is_zero(x):
# treat None/0/"0"/"0.0.0.0"/"::" as empty/zero
return x in (None, 0, "0", "0.0.0.0", "::")
def _proto_name(v):
try:
n = int(v)
return {6: "TCP", 17: "UDP"}.get(n, str(v))
except Exception:
return v or ""
def run_osquery(sql: str):
if not OSQUERYI:
raise SystemExit("osqueryi not found. Is it installed and on PATH?")
proc = subprocess.run(
[OSQUERYI, "--json", sql],
capture_output=True, text=True, encoding="utf-8", errors="ignore"
)
if proc.returncode != 0:
raise SystemExit(f"osqueryi failed: {proc.stderr.strip()}")
try:
return json.loads(proc.stdout)
except json.JSONDecodeError as e:
raise SystemExit(f"Failed to parse osquery JSON: {e}")
def main():
rows = run_osquery(QUERY)
if not rows:
print("No processes returned.")
return
from collections import defaultdict
sock_rows = run_osquery(SOCKETS_QUERY)
sockets_by_pid = defaultdict(list)
for s in sock_rows:
# normalize pid as string to match osquery output type
pid_key = str(s.get("pid"))
sockets_by_pid[pid_key].append(s)
for i, r in enumerate(rows, 1):
pid = r.get("pid")
name = r.get("name", "")
path = r.get("path", "")
exe_path = r.get("path", "")
ppid = r.get("parent")
start = r.get("start_time", "")
pid = str(r.get("pid"))
lower_name = (name or "").lower()
lower_path = (exe_path or "").lower()
conns = sockets_by_pid.get(pid, [])
print(f"{i:4d}. pid={pid:<6} ppid={ppid:<6} name={name}")
if path:
print(f" path: {path}")
if start:
print(f" start_time: {start}")
if conns:
# summary
distinct_ips = {c.get("remote_address") for c in conns if c.get("remote_address")}
established = sum(1 for c in conns if (c.get("state") or "").upper() == "ESTABLISHED" or not c.get("state"))
print(f" net: sockets={len(conns)} established={established} peers={len(distinct_ips)}")
# list ALL remote endpoints (ip:port proto state)
# 1) Path anomalies (writable/user dirs)
is_weird_path = any(p.lower() in lower_path for p in SUSP_PATHS)
if is_weird_path:
print(" ⚠ writable/user path — verify why this binary lives here")
# 2) LOLBINs
if lower_name in LOLBIN_NAMES:
print(" ⚠ LOLBIN — review full command line and parent process")
# 3) Suspicious parent → child combo (browser/Office spawning a LOLBIN)
if ppid and (ppid.lower() in SUSP_PARENTS) and (lower_name in LOLBIN_NAMES):
print(f" ⚠ Suspicious parent→child chain: {ppid_name} → {name}")
shown = 0
for c in (conns or []):
ra = c.get("remote_address")
rp = c.get("remote_port")
proto = _proto_name(c.get("protocol"))
state = (c.get("state") or "").upper()
if proto == "UDP" and (is_zero(ra) or is_zero(rp)):
continue
if is_zero(ra) or is_zero(rp):
continue
print(f" -> {ra}:{rp} {proto} {state}")
shown+= 1
if shown == 0:
print(" net: no outbound sockets observed")
# spacer line for readability
# print()
if __name__ == "__main__":
main()
print ("This list shows processes whose executable is not simultaneously (a) trusted/valid in the local Windows trust store and (b) signed by Microsoft.")
print("Final checks:")
print(" • Path sanity: compare actual path vs usual vendor path (e.g., Chrome → C:\\Program Files\\Google\\...).")
print(" • Unusual/writable paths: flag %TEMP%, %APPDATA%, Downloads, Desktop, C:\\Users\\* and C:\\ProgramData\\*.")
print(" • Parent/child: odd parents (browser/Office launching LOLBINs like rundll32, regsvr32, mshta, wscript, powershell).")
print(" • Network: review public IPs; look up suspicious ones in AbuseIPDB/VirusTotal manually; note RFC1918 vs public, ASN/CDN.")
print(" • Signing: unsigned or self-signed binaries/DLLs, mismatched company name vs folder owner (e.g., ‘Microsoft’ under Google path).")