-
Notifications
You must be signed in to change notification settings - Fork 0
Add RAG service: PDF ingestion, Chroma DB indexing, and FastAPI query endpoint #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| GROQ_API_KEY=your_groq_api_key_here |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import os | ||
| from dataclasses import dataclass | ||
| from pathlib import Path | ||
|
|
||
| import yaml | ||
| from dotenv import load_dotenv | ||
|
|
||
| load_dotenv() | ||
|
|
||
|
|
||
| @dataclass | ||
| class Settings: | ||
| corpus_path: str | ||
| chunk_size: int | ||
| chunk_overlap: int | ||
| retrieval_k: int | ||
| model: str | ||
| embedding_model: str | ||
| chroma_path: str | ||
| collection_name: str | ||
| groq_api_key: str | ||
|
|
||
|
|
||
| def load_settings(config_path: str = "config.yaml") -> Settings: | ||
| cfg_file = Path(config_path) | ||
| if not cfg_file.exists(): | ||
| raise FileNotFoundError(f"Missing config file: {cfg_file}") | ||
|
|
||
| with cfg_file.open("r", encoding="utf-8") as f: | ||
| cfg = yaml.safe_load(f) or {} | ||
|
|
||
| groq_api_key = os.getenv("GROQ_API_KEY", "") | ||
|
|
||
| return Settings( | ||
| corpus_path=cfg.get("corpus_path", "D:/Evalens/corpus/intercom_external/raw_pdfs"), | ||
| chunk_size=int(cfg.get("chunk_size", 1000)), | ||
| chunk_overlap=int(cfg.get("chunk_overlap", 150)), | ||
| retrieval_k=int(cfg.get("retrieval_k", 4)), | ||
| model=cfg.get("model", "llama-3.1-8b-instant"), | ||
| embedding_model=cfg.get("embedding_model", "sentence-transformers/all-MiniLM-L6-v2"), | ||
| chroma_path=cfg.get("chroma_path", ".chroma"), | ||
| collection_name=cfg.get("collection_name", "intercom_pdfs"), | ||
| groq_api_key=groq_api_key, | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from fastapi import FastAPI, HTTPException | ||
| from pydantic import BaseModel | ||
|
|
||
| from app.config import load_settings | ||
| from app.rag import RAGEngine | ||
|
|
||
| settings = load_settings() | ||
| rag = RAGEngine(settings) | ||
|
|
||
| app = FastAPI(title="Evalens Tracer-Bullet RAG") | ||
|
|
||
|
|
||
| class QueryRequest(BaseModel): | ||
| query: str | ||
|
|
||
|
|
||
| @app.on_event("startup") | ||
| def startup_event() -> None: | ||
| rag.ingest() | ||
|
Comment on lines
+19
to
+21
|
||
|
|
||
|
Comment on lines
+19
to
+22
|
||
|
|
||
| @app.post("/query") | ||
| def query_api(payload: QueryRequest): | ||
| try: | ||
| return rag.query(payload.query) | ||
| except ValueError as e: | ||
| raise HTTPException(status_code=400, detail=str(e)) from e | ||
| except Exception as e: # minimal tracer-bullet error handling | ||
| raise HTTPException(status_code=500, detail=str(e)) from e | ||
|
Comment on lines
+30
to
+31
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,139 @@ | ||||||
| from __future__ import annotations | ||||||
|
|
||||||
| from pathlib import Path | ||||||
| from typing import Any | ||||||
|
|
||||||
| import chromadb | ||||||
| from chromadb.config import Settings as ChromaSettings | ||||||
| from chromadb.utils import embedding_functions | ||||||
| from groq import Groq | ||||||
| import fitz | ||||||
|
|
||||||
| from app.config import Settings | ||||||
|
|
||||||
|
|
||||||
| class RAGEngine: | ||||||
| def __init__(self, settings: Settings) -> None: | ||||||
| self.settings = settings | ||||||
| self._chroma = chromadb.PersistentClient( | ||||||
| path=settings.chroma_path, | ||||||
| settings=ChromaSettings(allow_reset=False), | ||||||
| ) | ||||||
| self._embedding_fn = embedding_functions.SentenceTransformerEmbeddingFunction( | ||||||
| model_name=settings.embedding_model, | ||||||
| ) | ||||||
| self._collection = self._chroma.get_or_create_collection( | ||||||
| name=settings.collection_name, | ||||||
| embedding_function=self._embedding_fn, | ||||||
| metadata={"hnsw:space": "cosine"}, | ||||||
| ) | ||||||
|
|
||||||
| self._groq = Groq(api_key=settings.groq_api_key) if settings.groq_api_key else None | ||||||
|
|
||||||
| @staticmethod | ||||||
| def _chunk_text(text: str, chunk_size: int, chunk_overlap: int) -> list[str]: | ||||||
| text = " ".join(text.split()) | ||||||
| if not text: | ||||||
| return [] | ||||||
|
|
||||||
| chunks: list[str] = [] | ||||||
| start = 0 | ||||||
| step = max(chunk_size - chunk_overlap, 1) | ||||||
| while start < len(text): | ||||||
| end = start + chunk_size | ||||||
| chunks.append(text[start:end]) | ||||||
| start += step | ||||||
| return chunks | ||||||
|
|
||||||
| def ingest(self) -> dict[str, Any]: | ||||||
| corpus = Path(self.settings.corpus_path) | ||||||
| if not corpus.exists(): | ||||||
| raise FileNotFoundError(f"Corpus folder not found: {corpus}") | ||||||
|
|
||||||
| pdf_paths = sorted(corpus.glob("*.pdf")) | ||||||
| if not pdf_paths: | ||||||
| return {"indexed_files": 0, "indexed_chunks": 0} | ||||||
|
Comment on lines
+48
to
+55
|
||||||
|
|
||||||
| existing_count = self._collection.count() | ||||||
| if existing_count > 0: | ||||||
| return {"indexed_files": 0, "indexed_chunks": existing_count, "skipped": True} | ||||||
|
|
||||||
| ids: list[str] = [] | ||||||
| docs: list[str] = [] | ||||||
| metas: list[dict[str, Any]] = [] | ||||||
|
|
||||||
| for pdf_path in pdf_paths: | ||||||
| with fitz.open(pdf_path) as pdf_doc: | ||||||
| full_text = "\n".join(page.get_text("text") for page in pdf_doc) | ||||||
| chunks = self._chunk_text( | ||||||
| full_text, | ||||||
| self.settings.chunk_size, | ||||||
| self.settings.chunk_overlap, | ||||||
| ) | ||||||
| for idx, chunk in enumerate(chunks): | ||||||
| ids.append(f"{pdf_path.stem}-{idx}") | ||||||
| docs.append(chunk) | ||||||
| metas.append({"source": str(pdf_path), "chunk_index": idx}) | ||||||
|
|
||||||
| if ids: | ||||||
| self._collection.add(ids=ids, documents=docs, metadatas=metas) | ||||||
|
|
||||||
|
Comment on lines
+61
to
+80
|
||||||
| return {"indexed_files": len(pdf_paths), "indexed_chunks": len(ids), "skipped": False} | ||||||
|
|
||||||
| def query(self, user_query: str) -> dict[str, Any]: | ||||||
| if not user_query.strip(): | ||||||
| raise ValueError("query cannot be empty") | ||||||
|
|
||||||
| results = self._collection.query( | ||||||
| query_texts=[user_query], | ||||||
| n_results=self.settings.retrieval_k, | ||||||
| include=["documents", "metadatas", "distances"], | ||||||
|
||||||
| include=["documents", "metadatas", "distances"], | |
| include=["documents", "metadatas"], |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,8 @@ | ||||||
| corpus_path: "D:/Evalens/corpus/intercom_external/raw_pdfs" | ||||||
|
||||||
| corpus_path: "D:/Evalens/corpus/intercom_external/raw_pdfs" | |
| corpus_path: "./corpus" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| fastapi | ||
| uvicorn | ||
| pymupdf | ||
| chromadb | ||
| sentence-transformers | ||
| groq | ||
| python-dotenv | ||
| pyyaml |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
load_settings() defaults corpus_path to an absolute Windows path. Even if config.yaml is changed later, this fallback will still break portability when the key is missing; prefer a relative default (or require corpus_path in config) and optionally support overriding via an environment variable.