-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdetails.py
More file actions
131 lines (98 loc) · 3.11 KB
/
Copy pathdetails.py
File metadata and controls
131 lines (98 loc) · 3.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from dataclasses import dataclass
from typing import Protocol
from context import AnomalyContextV2
# ---------- Output Model ----------
@dataclass(frozen=True)
class ExplanationV2:
summary: str
why_it_matters: str
where_to_look: str
confidence: float
# ---------- LLM Interface ----------
class LLMClient(Protocol):
def complete(self, prompt: str) -> str:
...
# ---------- Explainer ----------
class ExplainerV2:
def __init__(self, llm: LLMClient):
self.llm = llm
def explain(self, ctx: AnomalyContextV2) -> ExplanationV2:
prompt = self._build_prompt(ctx)
raw = self.llm.complete(prompt)
return self._parse_response(raw)
# ---------- Prompt ----------
def _build_prompt(self, ctx: AnomalyContextV2) -> str:
a = ctx.anomaly
svc, level, template = a.key
deploy_info = (
f"Yes – version {ctx.deploy_event.version} at {ctx.deploy_event.timestamp}"
if ctx.deploy_event
else "No deploy detected in this window"
)
return f"""
You are a senior production engineer assisting during an active incident.
You MUST follow the rules exactly.
Facts:
- Service: {svc}
- Log level: {level}
- Pattern: "{template}"
- Reason flagged: {a.reason}
- Severity score: {a.severity:.2f}
- First seen: {a.first_seen}
- Last seen: {a.last_seen}
- Recent weighted count: {a.recent_weighted}
- Baseline weighted avg: {a.baseline_weighted}
Context window:
- From: {ctx.window_start}
- To: {ctx.window_end}
Level breakdown in window:
{ctx.level_breakdown}
Related patterns (same service):
{ctx.related_patterns}
Deploy correlation:
{deploy_info}
Rules:
- Do NOT mention security issues unless explicitly stated in the logs
- Do NOT use words like "breach", "attack", or "unauthorized"
- Do NOT propose fixes
- Do NOT speculate beyond the facts
- Do NOT claim causation
- Explain impact and investigation direction only
Return output in EXACTLY this format:
SUMMARY:
<1 short paragraph>
WHY IT MATTERS:
<1 short paragraph>
WHERE TO LOOK:
- <bullet>
- <bullet>
CONFIDENCE:
<number between 0 and 1>
"""
# ---------- Parsing ----------
def _parse_response(self, text: str) -> ExplanationV2:
sections = {}
current = None
for line in text.splitlines():
line = line.strip()
if not line:
continue
if line.endswith(":"):
current = line[:-1]
sections[current] = []
elif current:
sections[current].append(line)
try:
confidence = float(sections["CONFIDENCE"][0])
if not 0.0 <= confidence <= 1.0:
raise ValueError("confidence out of range")
return ExplanationV2(
summary=" ".join(sections["SUMMARY"]),
why_it_matters=" ".join(sections["WHY IT MATTERS"]),
where_to_look="\n".join(sections["WHERE TO LOOK"]),
confidence=confidence,
)
except Exception as e:
raise ValueError(
f"Malformed LLM response:\n{text}"
) from e