From 311ac91ef8f44d024da4d7931cda30ede421d66b Mon Sep 17 00:00:00 2001 From: Naveen Mandava <104335221+NaveenBuidl@users.noreply.github.com> Date: Wed, 8 Apr 2026 08:52:49 +0200 Subject: [PATCH] Switch PDF extraction to PyMuPDF for ingestion --- .env.example | 1 + app/config.py | 46 ++++++++++++++++ app/main.py | 31 +++++++++++ app/rag.py | 139 +++++++++++++++++++++++++++++++++++++++++++++++ config.yaml | 8 +++ requirements.txt | 8 +++ 6 files changed, 233 insertions(+) create mode 100644 app/config.py create mode 100644 app/main.py create mode 100644 app/rag.py create mode 100644 config.yaml create mode 100644 requirements.txt diff --git a/.env.example b/.env.example index e69de29..df42afd 100644 --- a/.env.example +++ b/.env.example @@ -0,0 +1 @@ +GROQ_API_KEY=your_groq_api_key_here diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..77eaef3 --- /dev/null +++ b/app/config.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path + +import yaml +from dotenv import load_dotenv + +load_dotenv() + + +@dataclass +class Settings: + corpus_path: str + chunk_size: int + chunk_overlap: int + retrieval_k: int + model: str + embedding_model: str + chroma_path: str + collection_name: str + groq_api_key: str + + +def load_settings(config_path: str = "config.yaml") -> Settings: + cfg_file = Path(config_path) + if not cfg_file.exists(): + raise FileNotFoundError(f"Missing config file: {cfg_file}") + + with cfg_file.open("r", encoding="utf-8") as f: + cfg = yaml.safe_load(f) or {} + + groq_api_key = os.getenv("GROQ_API_KEY", "") + + return Settings( + corpus_path=cfg.get("corpus_path", "D:/Evalens/corpus/intercom_external/raw_pdfs"), + chunk_size=int(cfg.get("chunk_size", 1000)), + chunk_overlap=int(cfg.get("chunk_overlap", 150)), + retrieval_k=int(cfg.get("retrieval_k", 4)), + model=cfg.get("model", "llama-3.1-8b-instant"), + embedding_model=cfg.get("embedding_model", "sentence-transformers/all-MiniLM-L6-v2"), + chroma_path=cfg.get("chroma_path", ".chroma"), + collection_name=cfg.get("collection_name", "intercom_pdfs"), + groq_api_key=groq_api_key, + ) diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..655375c --- /dev/null +++ b/app/main.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel + +from app.config import load_settings +from app.rag import RAGEngine + +settings = load_settings() +rag = RAGEngine(settings) + +app = FastAPI(title="Evalens Tracer-Bullet RAG") + + +class QueryRequest(BaseModel): + query: str + + +@app.on_event("startup") +def startup_event() -> None: + rag.ingest() + + +@app.post("/query") +def query_api(payload: QueryRequest): + try: + return rag.query(payload.query) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) from e + except Exception as e: # minimal tracer-bullet error handling + raise HTTPException(status_code=500, detail=str(e)) from e diff --git a/app/rag.py b/app/rag.py new file mode 100644 index 0000000..536ade0 --- /dev/null +++ b/app/rag.py @@ -0,0 +1,139 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import chromadb +from chromadb.config import Settings as ChromaSettings +from chromadb.utils import embedding_functions +from groq import Groq +import fitz + +from app.config import Settings + + +class RAGEngine: + def __init__(self, settings: Settings) -> None: + self.settings = settings + self._chroma = chromadb.PersistentClient( + path=settings.chroma_path, + settings=ChromaSettings(allow_reset=False), + ) + self._embedding_fn = embedding_functions.SentenceTransformerEmbeddingFunction( + model_name=settings.embedding_model, + ) + self._collection = self._chroma.get_or_create_collection( + name=settings.collection_name, + embedding_function=self._embedding_fn, + metadata={"hnsw:space": "cosine"}, + ) + + self._groq = Groq(api_key=settings.groq_api_key) if settings.groq_api_key else None + + @staticmethod + def _chunk_text(text: str, chunk_size: int, chunk_overlap: int) -> list[str]: + text = " ".join(text.split()) + if not text: + return [] + + chunks: list[str] = [] + start = 0 + step = max(chunk_size - chunk_overlap, 1) + while start < len(text): + end = start + chunk_size + chunks.append(text[start:end]) + start += step + return chunks + + def ingest(self) -> dict[str, Any]: + corpus = Path(self.settings.corpus_path) + if not corpus.exists(): + raise FileNotFoundError(f"Corpus folder not found: {corpus}") + + pdf_paths = sorted(corpus.glob("*.pdf")) + if not pdf_paths: + return {"indexed_files": 0, "indexed_chunks": 0} + + existing_count = self._collection.count() + if existing_count > 0: + return {"indexed_files": 0, "indexed_chunks": existing_count, "skipped": True} + + ids: list[str] = [] + docs: list[str] = [] + metas: list[dict[str, Any]] = [] + + for pdf_path in pdf_paths: + with fitz.open(pdf_path) as pdf_doc: + full_text = "\n".join(page.get_text("text") for page in pdf_doc) + chunks = self._chunk_text( + full_text, + self.settings.chunk_size, + self.settings.chunk_overlap, + ) + for idx, chunk in enumerate(chunks): + ids.append(f"{pdf_path.stem}-{idx}") + docs.append(chunk) + metas.append({"source": str(pdf_path), "chunk_index": idx}) + + if ids: + self._collection.add(ids=ids, documents=docs, metadatas=metas) + + return {"indexed_files": len(pdf_paths), "indexed_chunks": len(ids), "skipped": False} + + def query(self, user_query: str) -> dict[str, Any]: + if not user_query.strip(): + raise ValueError("query cannot be empty") + + results = self._collection.query( + query_texts=[user_query], + n_results=self.settings.retrieval_k, + include=["documents", "metadatas", "distances"], + ) + + documents = results.get("documents", [[]])[0] + metadatas = results.get("metadatas", [[]])[0] + + chunks = [] + sources = [] + for doc, meta in zip(documents, metadatas): + source = (meta or {}).get("source", "") + chunks.append(doc) + if source and source not in sources: + sources.append(source) + + answer = self._generate_answer(user_query, chunks) + + return { + "query": user_query, + "answer": answer, + "retrieved_chunks": chunks, + "sources": sources, + "model": self.settings.model, + "retrieval_k": self.settings.retrieval_k, + "chunk_size": self.settings.chunk_size, + } + + def _generate_answer(self, query: str, chunks: list[str]) -> str: + if not chunks: + return "I couldn't find relevant context in the indexed PDF corpus." + + context = "\n\n".join(chunks[: self.settings.retrieval_k]) + + if not self._groq: + return "Set GROQ_API_KEY to enable generation. Retrieved context is available in retrieved_chunks." + + system = ( + "You answer questions using only the provided context. " + "If context is insufficient, say so briefly." + ) + user = f"Context:\n{context}\n\nQuestion: {query}" + + completion = self._groq.chat.completions.create( + model=self.settings.model, + temperature=0, + messages=[ + {"role": "system", "content": system}, + {"role": "user", "content": user}, + ], + ) + return completion.choices[0].message.content or "" diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..646571e --- /dev/null +++ b/config.yaml @@ -0,0 +1,8 @@ +corpus_path: "D:/Evalens/corpus/intercom_external/raw_pdfs" +chunk_size: 1000 +chunk_overlap: 150 +retrieval_k: 4 +model: "llama-3.1-8b-instant" +embedding_model: "sentence-transformers/all-MiniLM-L6-v2" +chroma_path: ".chroma" +collection_name: "intercom_pdfs" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..8594511 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +fastapi +uvicorn +pymupdf +chromadb +sentence-transformers +groq +python-dotenv +pyyaml