-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathextractor.py
More file actions
351 lines (288 loc) · 12.6 KB
/
extractor.py
File metadata and controls
351 lines (288 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
"""
Core extraction engine with validation and retry logic.
This module implements the heart of the reliable extraction system:
1. LLM-based extraction using function calling
2. Pydantic validation
3. Automatic retry with error feedback
4. Comprehensive logging
"""
import json
import logging
from typing import Optional, Type, Dict, Any, List
from datetime import datetime
from pydantic import BaseModel, ValidationError
import requests
from .schemas import EXTRACTION_SCHEMAS, ExtractionSchema
logger = logging.getLogger(__name__)
class ExtractionAttempt(BaseModel):
"""Record of a single extraction attempt."""
attempt_number: int
success: bool
raw_response: Optional[str] = None
validation_errors: Optional[List[str]] = None
extracted_data: Optional[Dict[str, Any]] = None
timestamp: str
class ExtractionResult(BaseModel):
"""Complete result of an extraction operation."""
success: bool
data: Optional[ExtractionSchema] = None
attempts: List[ExtractionAttempt]
total_attempts: int
error_message: Optional[str] = None
model_config = {"arbitrary_types_allowed": True}
class LLMExtractor:
"""
Robust LLM-based data extractor with validation and retry logic.
This class demonstrates production-grade patterns for reliable LLM systems:
- Structured output prompting for free local models (Ollama)
- Schema validation with Pydantic
- Automatic retry with error feedback
- Comprehensive logging for debugging
"""
def __init__(
self,
model: str = "llama3.2",
ollama_host: str = "http://localhost:11434",
temperature: float = 0.1,
max_retries: int = 3
):
"""
Initialize the extractor.
Args:
model: Model name (default: llama3.2 - free via Ollama)
ollama_host: Ollama server URL
temperature: Low for determinism (0-0.2 recommended)
max_retries: Number of retry attempts on validation failure
"""
self.model = model
self.ollama_host = ollama_host
self.temperature = temperature
self.max_retries = max_retries
logger.info(
f"Initialized LLMExtractor: model={model}, "
f"temperature={temperature}, max_retries={max_retries}"
)
# Verify Ollama is accessible
try:
response = requests.get(f"{ollama_host}/api/tags")
if response.status_code != 200:
logger.warning("Ollama server might not be running. Please start Ollama.")
except Exception as e:
logger.warning(f"Could not connect to Ollama: {e}. Make sure Ollama is installed and running.")
def extract(
self,
text: str,
schema_type: str,
additional_context: Optional[str] = None
) -> ExtractionResult:
"""
Extract structured data from unstructured text.
Args:
text: Raw input text (email, invoice, etc.)
schema_type: Type of data to extract ('invoice', 'email', 'support_ticket')
additional_context: Optional context to guide extraction
Returns:
ExtractionResult with success status and extracted data or errors
"""
if schema_type not in EXTRACTION_SCHEMAS:
raise ValueError(
f"Unknown schema type: {schema_type}. "
f"Must be one of {list(EXTRACTION_SCHEMAS.keys())}"
)
schema_info = EXTRACTION_SCHEMAS[schema_type]
schema_model: Type[BaseModel] = schema_info["model"]
logger.info(f"Starting extraction for schema type: {schema_type}")
logger.debug(f"Input text length: {len(text)} characters")
attempts: List[ExtractionAttempt] = []
validation_feedback: Optional[str] = None
for attempt_num in range(1, self.max_retries + 1):
logger.info(f"Extraction attempt {attempt_num}/{self.max_retries}")
try:
# Call LLM with structured output prompting
raw_response = self._call_llm(
text=text,
schema_type=schema_type,
additional_context=additional_context,
validation_feedback=validation_feedback,
attempt_number=attempt_num
)
# Parse JSON response
extracted_dict = self._parse_json_response(raw_response)
# Validate with Pydantic
validated_data = schema_model(**extracted_dict)
# Success!
attempt = ExtractionAttempt(
attempt_number=attempt_num,
success=True,
raw_response=json.dumps(extracted_dict),
extracted_data=extracted_dict,
timestamp=datetime.now().isoformat()
)
attempts.append(attempt)
logger.info(f"✓ Extraction succeeded on attempt {attempt_num}")
return ExtractionResult(
success=True,
data=validated_data,
attempts=attempts,
total_attempts=attempt_num
)
except ValidationError as e:
# Validation failed - prepare feedback for retry
error_messages = [
f"{err['loc'][0]}: {err['msg']}"
for err in e.errors()
]
attempt = ExtractionAttempt(
attempt_number=attempt_num,
success=False,
raw_response=json.dumps(extracted_dict) if 'extracted_dict' in locals() else None,
validation_errors=error_messages,
timestamp=datetime.now().isoformat()
)
attempts.append(attempt)
logger.warning(
f"✗ Validation failed on attempt {attempt_num}: "
f"{'; '.join(error_messages)}"
)
# Build feedback for next attempt
validation_feedback = self._build_validation_feedback(e, extracted_dict)
# If this was the last attempt, return failure
if attempt_num == self.max_retries:
logger.error(f"Extraction failed after {self.max_retries} attempts")
return ExtractionResult(
success=False,
attempts=attempts,
total_attempts=attempt_num,
error_message=f"Validation failed after {self.max_retries} attempts: {'; '.join(error_messages)}"
)
except Exception as e:
# Unexpected error (API error, parsing error, etc.)
attempt = ExtractionAttempt(
attempt_number=attempt_num,
success=False,
validation_errors=[str(e)],
timestamp=datetime.now().isoformat()
)
attempts.append(attempt)
logger.error(f"✗ Unexpected error on attempt {attempt_num}: {e}")
# Don't retry on unexpected errors
return ExtractionResult(
success=False,
attempts=attempts,
total_attempts=attempt_num,
error_message=f"Unexpected error: {str(e)}"
)
# Should never reach here
return ExtractionResult(
success=False,
attempts=attempts,
total_attempts=len(attempts),
error_message="Unknown error"
)
def _call_llm(
self,
text: str,
schema_type: str,
additional_context: Optional[str],
validation_feedback: Optional[str],
attempt_number: int
) -> str:
"""Call Ollama API with structured output prompting."""
schema_info = EXTRACTION_SCHEMAS[schema_type]
schema_model = schema_info["model"]
# Build system prompt
system_prompt = self._build_system_prompt(
schema_type,
additional_context,
validation_feedback,
attempt_number
)
# Build user prompt with JSON schema
json_schema = schema_model.model_json_schema()
user_prompt = f"""Extract structured data from the following text and return it as valid JSON.
TEXT TO EXTRACT FROM:
{text}
REQUIRED JSON SCHEMA:
{json.dumps(json_schema, indent=2)}
Return ONLY the JSON object, no explanations or markdown. Ensure all required fields are present."""
logger.debug(f"Calling Ollama API: model={self.model}, temp={self.temperature}")
# Call Ollama API
try:
response = requests.post(
f"{self.ollama_host}/api/generate",
json={
"model": self.model,
"prompt": f"System: {system_prompt}\n\nUser: {user_prompt}",
"stream": False,
"options": {
"temperature": self.temperature
}
},
timeout=60
)
response.raise_for_status()
result = response.json()
return result.get("response", "")
except requests.exceptions.RequestException as e:
raise RuntimeError(f"Ollama API error: {e}. Make sure Ollama is running.")
def _parse_json_response(self, response: str) -> Dict[str, Any]:
"""Extract and parse JSON from LLM response."""
# Try to find JSON in the response
response = response.strip()
# Remove markdown code blocks if present
if response.startswith("```"):
lines = response.split("\n")
response = "\n".join(lines[1:-1]) if len(lines) > 2 else response
if response.startswith("json"):
response = response[4:].strip()
# Try to parse JSON
try:
return json.loads(response)
except json.JSONDecodeError as e:
# Try to extract JSON from mixed content
import re
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group())
except json.JSONDecodeError:
pass
raise ValueError(f"Could not parse JSON from response: {e}")
def _build_system_prompt(
self,
schema_type: str,
additional_context: Optional[str],
validation_feedback: Optional[str],
attempt_number: int
) -> str:
"""Build system prompt with retry feedback if applicable."""
base_prompt = f"""You are a precise data extraction system. Extract structured data from {schema_type}s.
Rules:
1. Extract ONLY information present in the text
2. Do NOT hallucinate or invent data
3. Use exact dates in YYYY-MM-DD format
4. Use exact numbers (don't round unless necessary)
5. For optional fields, use null if not present
6. Be precise with email addresses, IDs, and codes"""
if additional_context:
base_prompt += f"\n\nAdditional context: {additional_context}"
if validation_feedback and attempt_number > 1:
base_prompt += f"\n\n⚠️ PREVIOUS ATTEMPT FAILED ⚠️\n{validation_feedback}\n\nPlease fix these issues in this attempt."
return base_prompt
def _build_validation_feedback(
self,
error: ValidationError,
extracted_dict: Dict[str, Any]
) -> str:
"""Build detailed feedback for the retry attempt."""
feedback_lines = ["The previous extraction had the following validation errors:\n"]
for err in error.errors():
field = err['loc'][0] if err['loc'] else 'unknown'
message = err['msg']
error_type = err['type']
actual_value = extracted_dict.get(field, "MISSING")
feedback_lines.append(
f"- Field '{field}': {message} (type: {error_type}, value: {actual_value})"
)
feedback_lines.append("\nPlease correct these errors and ensure all fields match the schema requirements.")
return "\n".join(feedback_lines)