Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion api/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,8 @@ class FormSubmission(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
template_id: int
input_text: str
output_pdf_path: str
output_pdf_path: str | None = None
status: str = Field(default="completed")
extracted_data: dict = Field(default_factory=dict, sa_column=Column(JSON))
missing_fields: list = Field(default_factory=list, sa_column=Column(JSON))
created_at: datetime = Field(default_factory=datetime.utcnow)
70 changes: 64 additions & 6 deletions api/routes/forms.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from fastapi import APIRouter, Depends
from sqlmodel import Session
from api.deps import get_db
from api.schemas.forms import FormFill, FormFillResponse
from api.schemas.forms import FormFill, FormFeedback, FormFillResponse
from api.db.repositories import create_form, get_template
from api.db.models import FormSubmission
from api.errors.base import AppError
Expand All @@ -11,15 +11,73 @@

@router.post("/fill", response_model=FormFillResponse)
def fill_form(form: FormFill, db: Session = Depends(get_db)):
if not get_template(db, form.template_id):
raise AppError("Template not found", status_code=404)

fetched_template = get_template(db, form.template_id)
if not fetched_template:
raise AppError("Template not found", status_code=404)

controller = Controller()
path = controller.fill_form(user_input=form.input_text, fields=fetched_template.fields, pdf_form_path=fetched_template.pdf_path)

extracted_data, missing_fields = controller.extract_data(
user_input=form.input_text,
fields=fetched_template.fields
)

submission = FormSubmission(**form.model_dump(), output_pdf_path=path)
if missing_fields:
status = "missing_data"
path = None
else:
status = "completed"
path = controller.fill_pdf(answers=extracted_data, pdf_form_path=fetched_template.pdf_path)

submission = FormSubmission(
template_id=form.template_id,
input_text=form.input_text,
output_pdf_path=path,
status=status,
extracted_data=extracted_data,
missing_fields=missing_fields
)
return create_form(db, submission)


@router.post("/{submission_id}/feedback", response_model=FormFillResponse)
def form_feedback(submission_id: int, feedback: FormFeedback, db: Session = Depends(get_db)):
submission = db.get(FormSubmission, submission_id)
if not submission:
raise AppError("Form submission not found", status_code=404)

if submission.status == "completed":
raise AppError("Form already completed", status_code=400)

fetched_template = get_template(db, submission.template_id)
if not fetched_template:
raise AppError("Template not found", status_code=404)

controller = Controller()

# Only target missing fields from the template
target_fields = {field: fetched_template.fields[field] for field in submission.missing_fields if field in fetched_template.fields}

extracted_data, missing_fields = controller.extract_data(
user_input=feedback.input_text,
fields=target_fields,
existing_data=submission.extracted_data
)

if missing_fields:
submission.status = "missing_data"
submission.output_pdf_path = None
else:
submission.status = "completed"
submission.output_pdf_path = controller.fill_pdf(answers=extracted_data, pdf_form_path=fetched_template.pdf_path)

submission.extracted_data = extracted_data
submission.missing_fields = missing_fields

db.add(submission)
db.commit()
db.refresh(submission)

return submission


8 changes: 7 additions & 1 deletion api/schemas/forms.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
from pydantic import BaseModel
from typing import Optional

class FormFill(BaseModel):
template_id: int
input_text: str

class FormFeedback(BaseModel):
input_text: str

class FormFillResponse(BaseModel):
id: int
template_id: int
input_text: str
output_pdf_path: str
status: str
output_pdf_path: Optional[str] = None
extracted_data: dict = {}
missing_fields: list = []

class Config:
from_attributes = True
7 changes: 5 additions & 2 deletions src/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ class Controller:
def __init__(self):
self.file_manipulator = FileManipulator()

def fill_form(self, user_input: str, fields: list, pdf_form_path: str):
return self.file_manipulator.fill_form(user_input, fields, pdf_form_path)
def extract_data(self, user_input: str, fields: dict, existing_data: dict = None):
return self.file_manipulator.extract_data(user_input, fields, existing_data)

def fill_pdf(self, answers: dict, pdf_form_path: str):
return self.file_manipulator.fill_pdf(answers, pdf_form_path)

def create_template(self, pdf_path: str):
return self.file_manipulator.create_template(pdf_path)
30 changes: 17 additions & 13 deletions src/file_manipulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,28 @@ def create_template(self, pdf_path: str):
prepare_form(pdf_path, template_path)
return template_path

def fill_form(self, user_input: str, fields: list, pdf_form_path: str):
def extract_data(self, user_input: str, fields: dict, existing_data: dict = None):
"""
It receives the raw data, runs the PDF filling logic,
and returns the path to the newly created file.
Runs LLM to extract data. Returns extracted_data and missing_fields.
"""
print("[1] Received request from frontend.")
print(f"[2] PDF template path: {pdf_form_path}")

print("[1] Starting extraction process...")
if existing_data is None:
existing_data = {}

llm = LLM(transcript_text=user_input, target_fields=fields, json=existing_data)
llm.main_loop()
return llm.get_data(), llm.get_missing_fields()

def fill_pdf(self, answers: dict, pdf_form_path: str):
"""
Receives extracted data and fills the PDF.
"""
print(f"[2] Filling PDF template: {pdf_form_path}")
if not os.path.exists(pdf_form_path):
print(f"Error: PDF template not found at {pdf_form_path}")
return None # Or raise an exception
raise FileNotFoundError(f"PDF template not found at {pdf_form_path}")

print("[3] Starting extraction and PDF filling process...")
try:
self.llm._target_fields = fields
self.llm._transcript_text = user_input
output_name = self.filler.fill_form(pdf_form=pdf_form_path, llm=self.llm)
output_name = self.filler.fill_form(pdf_form=pdf_form_path, answers=answers)

print("\n----------------------------------")
print("✅ Process Complete.")
Expand All @@ -43,5 +48,4 @@ def fill_form(self, user_input: str, fields: list, pdf_form_path: str):

except Exception as e:
print(f"An error occurred during PDF generation: {e}")
# Re-raise the exception so the frontend can handle it
raise e
10 changes: 3 additions & 7 deletions src/filler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ class Filler:
def __init__(self):
pass

def fill_form(self, pdf_form: str, llm: LLM):
def fill_form(self, pdf_form: str, answers: dict):
"""
Fill a PDF form with values from user_input using LLM.
Fill a PDF form with values from answers dictionary.
Fields are filled in the visual order (top-to-bottom, left-to-right).
"""
output_pdf = (
Expand All @@ -19,11 +19,7 @@ def fill_form(self, pdf_form: str, llm: LLM):
+ "_filled.pdf"
)

# Generate dictionary of answers from your original function
t2j = llm.main_loop()
textbox_answers = t2j.get_data() # This is a dictionary

answers_list = list(textbox_answers.values())
answers_list = list(answers.values())

# Read PDF
pdf = PdfReader(pdf_form)
Expand Down
9 changes: 7 additions & 2 deletions src/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def __init__(self, transcript_text=None, target_fields=None, json=None):
self._transcript_text = transcript_text # str
self._target_fields = target_fields # List, contains the template field.
self._json = json # dictionary
self._missing_fields = []

def type_check_all(self):
if type(self._transcript_text) is not str:
Expand Down Expand Up @@ -72,8 +73,9 @@ def main_loop(self):

# parse response
json_data = response.json()
parsed_response = json_data["response"]
# print(parsed_response)
parsed_response = json_data["response"].strip()
if parsed_response.replace('"', "") == "-1":
self._missing_fields.append(field)
self.add_response_to_json(field, parsed_response)

print("----------------------------------")
Expand Down Expand Up @@ -133,3 +135,6 @@ def handle_plural_values(self, plural_value):

def get_data(self):
return self._json

def get_missing_fields(self):
return self._missing_fields
97 changes: 72 additions & 25 deletions tests/test_forms.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,72 @@
def test_submit_form(client):
pass
# First create a template
# form_payload = {
# "template_id": 3,
# "input_text": "Hi. The employee's name is John Doe. His job title is managing director. His department supervisor is Jane Doe. His phone number is 123456. His email is jdoe@ucsc.edu. The signature is <Mamañema>, and the date is 01/02/2005",
# }

# template_res = client.post("/templates/", json=template_payload)
# template_id = template_res.json()["id"]

# # Submit a form
# form_payload = {
# "template_id": template_id,
# "data": {"rating": 5, "comment": "Great service"},
# }

# response = client.post("/forms/", json=form_payload)

# assert response.status_code == 200

# data = response.json()
# assert data["id"] is not None
# assert data["template_id"] == template_id
# assert data["data"] == form_payload["data"]
from unittest.mock import patch, MagicMock

def test_interactive_feedback_loop(client):
# 1) Create a template
template_payload = {
"name": "Test Form",
"pdf_path": "src/inputs/test.pdf",
"fields": {
"Employee name": "",
"Job title": ""
}
}

with patch("api.routes.templates.prepare_form") as mock_prepare:
template_res = client.post("/templates/create", json=template_payload)
template_id = template_res.json()["id"]

# 2) First Fill (Missing "Job title")
form_payload = {
"template_id": template_id,
"input_text": "The employee name is John Doe."
}

def mock_ollama_call(*args, **kwargs):
json_payload = kwargs.get("json", {})
prompt = json_payload.get("prompt", "")

mock_response = MagicMock()
if "Employee name" in prompt:
mock_response.json.return_value = {"response": "John Doe"}
else:
mock_response.json.return_value = {"response": "-1"} # Missing
return mock_response

with patch("src.llm.requests.post", side_effect=mock_ollama_call):
with patch("os.path.exists", return_value=True):
response = client.post("/forms/fill", json=form_payload)

assert response.status_code == 200
data = response.json()
assert data["status"] == "missing_data"
assert "Job title" in data["missing_fields"]
assert data["extracted_data"]["Employee name"] == "John Doe"
submission_id = data["id"]

# 3) Feedback (Providing "Job title")
feedback_payload = {
"input_text": "His job title is Engineer."
}

def mock_ollama_feedback(*args, **kwargs):
json_payload = kwargs.get("json", {})
prompt = json_payload.get("prompt", "")

mock_response = MagicMock()
if "Job title" in prompt:
mock_response.json.return_value = {"response": "Engineer"}
else:
mock_response.json.return_value = {"response": "-1"}
return mock_response

with patch("src.llm.requests.post", side_effect=mock_ollama_feedback):
with patch("src.filler.Filler.fill_form", return_value="output_path.pdf"):
with patch("os.path.exists", return_value=True):
feedback_res = client.post(f"/forms/{submission_id}/feedback", json=feedback_payload)

assert feedback_res.status_code == 200
feedback_data = feedback_res.json()
assert feedback_data["status"] == "completed"
assert len(feedback_data["missing_fields"]) == 0
assert feedback_data["extracted_data"]["Job title"] == "Engineer"
assert feedback_data["extracted_data"]["Employee name"] == "John Doe"