This document provides a comprehensive guide to the Phase 6 ingestion pipeline for large legal corpora.
The Phase 6 pipeline ingests, normalizes, embeds, and indexes legal documents from authoritative sources including:
- U.S. Constitution
- United States Code (54 titles)
- Code of Federal Regulations (50 titles)
- California Constitution and Statutes
- Federal and state case law
The pipeline consists of five main components:
- Ingestion (
ingest.py) - Loads raw documents and creates normalized JSON - Normalization (
normalize.py) - Chunks text and creates canonical format - Embedding (
embeddings.py) - Generates TF-IDF vectors for semantic search - Indexing (
retriever.py) - Builds vector index for similarity search - Analysis (
analyzer.py) - Detects anomalies and quality issues
Ensure dependencies are installed:
pip install -e ".[dev]"
pip install -r requirements.txtPlace raw legal documents in data/sources/:
# Example: Download from authoritative sources
# U.S. Code: https://uscode.house.gov/
# CFR: https://www.ecfr.gov/
# CA Codes: https://leginfo.legislature.ca.gov/Supported formats:
.txt- Plain text.md- Markdown.json- Pre-normalized JSON
# Basic ingestion
python scripts/ingest_and_index.py --source data/sources --out data/cases
# With anomaly detection
python scripts/ingest_and_index.py --source data/sources --out data/cases --analyze
# Specify jurisdiction
python scripts/ingest_and_index.py --source data/sources --jurisdiction federal --analyzeCheck outputs:
- Normalized documents:
data/cases/*.json - Vector embeddings:
data/vectors/collection_vectors.npy - Audit reports:
data/reports/audit_report.json
All normalized documents conform to schemas/legal_schema.json:
{
"id": "unique-document-id",
"title": "Document Title",
"jurisdiction": "federal|california|unknown",
"source": "path/to/source.txt",
"source_url": "https://authoritative-source.gov/...",
"version_date": "2024-01-01",
"ingest_timestamp": "2025-11-13T06:00:00Z",
"checksum": "sha256-hash-of-text",
"citations": ["42 U.S.C. § 1983"],
"metadata": {
"processor_version": "0.1.0",
"transformations": ["ingest", "normalize"]
},
"text": "Full document text..."
}Every document includes:
- checksum: SHA-256 hash for integrity verification
- ingest_timestamp: UTC timestamp of ingestion
- source: Original file path
- metadata: Processor version and transformation history
This ensures reproducibility and audit trails.
The pipeline uses TF-IDF (Term Frequency-Inverse Document Frequency) for vector embeddings:
- Deterministic: Same input always produces same output
- Reproducible: No external API calls or randomness
- Efficient: Fast computation, no GPU required
- Semantic: Captures term importance and document similarity
from oraculus_di_auditor.embeddings import LocalEmbedder
# Create embedder
embedder = LocalEmbedder(max_features=2048)
# Fit on corpus
embedder.fit(documents)
# Save vocabulary for consistency
embedder.save_vocabulary("data/vectors/vocab.pkl")
# Load later
embedder.load_vocabulary("data/vectors/vocab.pkl")The analyzer detects:
- Long sentences (> 1000 chars)
- Missing citations (patterns in text not in citations array)
- Contradictory dates (years differing by > 50 years)
Example:
from oraculus_di_auditor.analyzer import find_anomalies
result = find_anomalies(document)
# {
# "id": "doc-id",
# "anomalies": [...],
# "count": 2
# }Search for similar documents:
from oraculus_di_auditor.retriever import Retriever
from oraculus_di_auditor.embeddings import LocalEmbedder
# Load index
retriever = Retriever()
retriever.load("collection")
# Load embedder
embedder = LocalEmbedder()
embedder.load_vocabulary("data/vectors/collection_vocab.pkl")
# Query
query = "civil rights violations"
query_vec = embedder.embed(query)
results = retriever.search(query_vec, top_k=5)
for idx, score, metadata in results:
print(f"{metadata['title']}: {score:.3f}")- Use TF-IDF vectors and NumPy index
- Keep raw files in
data/sources/(gitignored) - Commit only small normalized samples to repo
- Vector storage: FAISS, Milvus, or Weaviate
- Raw storage: S3 or network-attached storage
- Parallel processing: Message queue (Celery/RabbitMQ)
- Monitoring: Prometheus + structured logging
For large corpora (e.g., all 54 USC titles):
- Download in batches: One title at a time
- Validate checksums: Ensure data integrity
- Incremental indexing: Build per-title indexes
- Merge indexes: Combine after validation
Example workflow:
# Ingest Title 1
python scripts/ingest_and_index.py --source data/sources/title_1
# Ingest Title 2
python scripts/ingest_and_index.py --source data/sources/title_2
# Continue for all titles...- Constitution: Public domain
- U.S. Code: https://uscode.house.gov/ (bulk downloads)
- CFR: https://www.ecfr.gov/ (XML/JSON formats)
- Federal Register: https://www.federalregister.gov/
- CA Constitution: https://leginfo.legislature.ca.gov/
- CA Codes: https://leginfo.legislature.ca.gov/faces/codes.xhtml
- CourtListener: https://www.courtlistener.com/ (bulk data available)
- SCOTUS: https://www.supremecourt.gov/
- RECAP: https://free.law/recap/
- Public domain only: Only ingest official public documents
- Gitignore raw files: Keep
data/sources/out of repo - Private sources: Use
data/sources/private/(also gitignored) - No secrets: Never commit API keys or credentials
Run tests:
# All tests
pytest -v
# Specific modules
pytest tests/test_ingest_module.py tests/test_embeddings_module.py -v
# With coverage
pytest --cov=src/oraculus_di_auditor --cov-report=term-missingThe CI pipeline runs:
- Code formatting (black)
- Linting (ruff)
- Tests (pytest)
- Coverage reporting
See .github/workflows/python-ci.yml for details.
mkdir -p data/sources
# Add some .txt files# Ensure you fit the embedder before calling embed()
embedder.fit(corpus)- Reduce
max_featuresin LocalEmbedder - Process documents in smaller batches
- Use external storage for large files
- Download sample corpora from authoritative sources
- Run pipeline on small samples (1-3 docs per corpus)
- Validate outputs and reports
- Scale up to larger batches
- Implement production storage (S3, FAISS, etc.)