Skip to content

devgomesai/DocumentIngestionPipeline

Repository files navigation

Document Ingestion Pipeline

A highly scalable, durable, and reliable multi-document ingestion and Retrieval-Augmented Generation (RAG) system built with DBOS, LlamaIndex, FastAPI, and Milvus.

image\README\1767934207165.png

Overview

This project provides a production-ready document ingestion and RAG (Retrieval-Augmented Generation) system that allows you to:

  • Upload multiple documents in various formats
  • Process documents asynchronously using distributed workers
  • Store document embeddings in a vector database
  • Chat with your documents using natural language
  • Scale horizontally with DBOS's durable workflow orchestration

The system is designed for reliability and scalability, using DBOS to ensure that document processing workflows are durable and can recover from failures without data loss.

Key Features

  • Multi-Format Support: Ingest PDFs, Word documents, PowerPoint presentations, images, text files, and more using Unstructured data loading
  • Scalable Architecture: Distribute workload across multiple DBOS workers
  • Durable Workflows: Automatic recovery from failures with DBOS
  • Real-time Progress Tracking: Monitor document processing progress via workflow events
  • Vector Search: Fast semantic search powered by Milvus vector database
  • RAG-Powered Chat: Interact with your documents using state-of-the-art LLMs (Groq)
  • REST API: Easy integration with any application via FastAPI

Technology Stack

Core Framework

  • DBOS - Durable workflow orchestration and queue management
  • FastAPI - Modern, high-performance web framework
  • LlamaIndex - Data framework for RAG applications

AI & ML Components

  • Milvus - High-performance vector database
  • Groq - Fast LLM inference
  • Ollama - Local embedding models
  • Unstructured - Document parsing and processing (via LlamaIndex)

Additional Libraries

  • SQLAlchemy - Database ORM
  • psycopg2 - PostgreSQL adapter
  • python-dotenv - Environment configuration
  • pytest - Testing framework

Architecture

The system follows a distributed microservices architecture:

┌─────────────┐
│   Client    │
└──────┬──────┘
       │
       ▼
┌─────────────────────────────────────────┐
│         FastAPI Server                  │
│  ┌──────────────┐  ┌─────────────────┐ │
│  │  /file-upload│  │     /chat       │ │
│  └──────┬───────┘  └────────┬────────┘ │
└─────────┼────────────────────┼──────────┘
          │                    │
          ▼                    │
    ┌──────────┐               │
    │   DBOS   │               │
    │  Queue   │               │
    └────┬─────┘               │
         │                     │
         ▼                     ▼
┌───────────────────┐   ┌──────────────┐
│  DBOS Workers     │   │    Milvus    │
│  (Distributed)    │───│Vector Database│
│                   │   └──────────────┘
│  ┌──────────────┐ │
│  │ Unstructured │ │
│  │    Reader    │ │
│  └──────────────┘ │
└───────────────────┘

Workflow:

  1. User uploads documents via REST API
  2. Server enqueues processing tasks in DBOS queue
  3. Multiple workers pick up tasks and process documents in parallel
  4. Workers use UnstructuredReader to parse documents
  5. Documents are chunked (512 tokens), embedded (Ollama), and stored in Milvus
  6. Users query documents via chat endpoint using Groq LLM with RAG

Prerequisites

Before you begin, ensure you have the following installed:

  • Python 3.13+
  • PostgreSQL (for DBOS system database)
  • Milvus (vector database - can use Zilliz Cloud for managed service)
  • Ollama (for local embeddings) - Installation Guide
  • UV (Python package manager) - Installation Guide - Optional but recommended

External Services

You'll need API keys for:

  • DBOS Cloud (or self-hosted DBOS system database)
  • Groq API (for LLM inference) - Console
  • Milvus/Zilliz (vector database)

Installation

1. Clone the Repository

git clone https://github.com/devgomesai/DocumentIngestionPipeline.git
cd DocumentIngestionPipeline-AllCognix

2. Create Virtual Environment

python -m venv .venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate

Or using UV:

uv venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate

3. Install Dependencies

pip install -e .

Or using UV:

uv pip install -e .

4. Install Ollama and Pull Embedding Model

# Install Ollama from https://ollama.ai/
ollama pull nomic-embed-text:latest

Configuration

1. Create Environment File

Copy the example environment file and fill in your credentials:

cp .env.example .env

2. Configure Environment Variables

Edit .env with your configuration:

# DBOS Configurations
DBOS_SYSTEM_DATABASE_URL=
CONDUCTOR_KEY=

# DBOS Application name
APPLICATION_NAME=llama-rag

# Milvus VectorDB Configurations
MILVUS_ENDPOINT=
MILVUS_TOKEN=
MILVUS_COLLECTION=

# GROQ Configurations
GROQ_API_KEY=
GROQ_MODEL=openai/gpt-oss-120b

# OLLAMA Configurations
OLLAMA_BASE_URL=http://localhost:11434
OLLAMA_EMBEDDING_MODEL=nomic-embed-text:latest

Configuration Details

  • DBOS_SYSTEM_DATABASE_URL: PostgreSQL connection string for DBOS state management
  • MILVUS_ENDPOINT: Milvus server endpoint (local or Zilliz Cloud)
  • GROQ_API_KEY: API key from Groq Console
  • OLLAMA_BASE_URL: Local Ollama server URL (default: http://localhost:11434)

Usage

Starting the API Server

The FastAPI server provides REST endpoints for document upload and chat:

uvicorn ingestion.server:app --reload

The server will start at http://localhost:8000

  • API Documentation: http://localhost:8000/docs
  • Alternative API Docs: http://localhost:8000/redoc

Starting DBOS Workers

Workers process documents from the queue. You can start multiple workers for parallel processing:

# Start first worker
uv run worker.py

In separate terminals, start additional workers:

# Worker 2
uv run worker.py

Each worker runs with a unique process ID and can handle tasks independently:

🚀 DBOS ingestion worker started (PID 12345)
=== Milvus index initialized ===
[WORKER 12345] Parsing data/uuid_document.pdf
[INDEXED] doc_id=abc123

Uploading Documents

Using cURL

curl -X POST "http://localhost:8000/file-upload" \
  -F "files=@/path/to/document.pdf" \
  -F "files=@/path/to/document2.docx"

Chatting with Documents

Once documents are indexed, you can chat with them:

Using cURL

curl -X POST "http://localhost:8000/chat" \
  -H "Content-Type: application/json" \
  -d '{"message": "What are the main topics covered in these documents?"}'

Project Structure

DocumentIngestionPipeline/
├── ingestion/
│   ├── __init__.py
│   ├── server.py          # FastAPI application with upload and chat endpoints
│   ├── workflows.py       # DBOS workflow definitions
│   ├── steps.py           # Document parsing and indexing steps
│   └── index.py           # Vector store and chat engine initialization
├── samples/               # Sample documents for testing
├── pytest/                # Test files and fixtures
├── data/                  # Uploaded documents (created at runtime)
├── image/                 # Documentation images
├── .env.example           # Example environment configuration
├── .gitignore            # Git ignore rules
├── dbos-config.yaml      # DBOS configuration file
├── pyproject.toml        # Project dependencies and metadata
├── uv.lock               # UV lock file
├── worker.py             # DBOS worker entry point
└── README.md             # This file

Core Components

ingestion/server.py

FastAPI application providing REST endpoints:

  • POST /file-upload: Upload and enqueue documents for processing
  • POST /chat: Query documents using natural language

ingestion/workflows.py

DBOS workflows for durable document processing:

  • index_uploaded_files: Orchestrates batch document processing
  • index_single_file: Processes individual documents

ingestion/steps.py

DBOS steps for atomic operations:

  • parse_uploaded_file: Parse documents using UnstructuredReader
  • index_and_store_docs: Create embeddings with Ollama and store in Milvus vector database

ingestion/index.py

Vector store and RAG components:

  • Milvus vector store configuration (768-dimensional embeddings)
  • LlamaIndex settings: chunk size (512 tokens), overlap (50 tokens)
  • Embedding model: Ollama with nomic-embed-text
  • LLM: Groq API
  • Chat engine initialization with async support

worker.py

DBOS worker process that:

  • Connects to DBOS system database
  • Registers workflows and steps
  • Processes tasks from the indexing queue

License

This project is provided as-is for educational and commercial use. Please check individual dependencies for their respective licenses.

Acknowledgments

  • DBOS - For providing durable workflow orchestration
  • LlamaIndex - For the excellent RAG framework
  • Milvus - For high-performance vector search
  • Groq - For fast LLM inference

About

Document ingestion pipeline using FastAPI, DBOS, and LlamaIndex

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors