Research data infrastructure for quantitative strategy development. Ingests OHLCV data from multiple sources, performs statistical validation (gap detection, outlier filtering, corporate action adjustment), and stores in columnar format optimized for backtesting queries.
Status: Beta - Core functionality implemented and tested. See notebooks/examples.py for usage examples.
- Multi-Source Ingestion: Yahoo Finance, Polygon.io, and CSV/Parquet files
- Data Validation Pipeline: Statistical anomaly detection, gap filling, outlier filtering
- Corporate Action Adjustments: Automatic handling of splits, dividends, and mergers
- Point-in-Time Data: Prevents lookahead bias with as-of date queries
- High-Performance Storage: DuckDB/Parquet backend with sub-second query latency
- Research-Optimized Queries: Pre-built queries for common backtesting patterns (returns, volatility, correlation)
- Data Quality Metrics: Automated reporting on data completeness and integrity
- Polars-First Design: Built on Polars for high-performance data processing
┌─────────────────────────────────────────────────────────────────────────┐
│ Data Sources │
├──────────────┬──────────────┬──────────────┬──────────────────────────────┤
│ Yahoo Finance│ Polygon.io │ CSV/Parquet │ Custom Sources │
└──────┬───────┴──────┬───────┴──────┬───────┴──────────────┬──────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Ingestion Layer │
│ • Source adapters (BaseDataSource interface) │
│ • Configurable data fetching with date ranges │
│ • Automatic column normalization │
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Validation Pipeline │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Gap │ │ Outlier │ │ Corporate │ │ Quality │ │
│ │ Detection │→ │ Filtering │→ │ Actions │→ │ Scoring │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Storage Layer │
│ • DuckDB for analytics queries (default) │
│ • Parquet files for archival (partitioned by date) │
│ • Point-in-time snapshots for backtesting │
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Query Layer (ResearchAPI) │
│ • OHLCV queries with filtering │
│ • Returns calculation (daily/weekly/monthly, simple/log) │
│ • Rolling volatility and correlation analysis │
│ • Universe screening and summary statistics │
│ • Point-in-time queries (backtesting) │
│ • Custom SQL support (DuckDB) │
└─────────────────────────────────────────────────────────────────────────┘
# Clone the repository
git clone https://github.com/pramurta/quant-data-pipeline.git
cd quant-data-pipeline
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt
# Install in development mode
pip install -e .# Copy example config (optional)
cp config/config.example.yaml config/config.yaml
# Set API keys (optional - for Polygon.io)
export POLYGON_API_KEY="your_key_here"from src.pipeline import DataPipeline
from src.ingestion import YahooFinanceSource
# Initialize pipeline
pipeline = DataPipeline(storage_path="./data")
# Ingest data
symbols = ["AAPL", "GOOGL", "MSFT", "AMZN", "META"]
pipeline.ingest(
source=YahooFinanceSource(),
symbols=symbols,
start_date="2020-01-01",
end_date="2024-01-01"
)
# Query data for research
df = pipeline.query.get_ohlcv(
symbols=symbols,
start_date="2023-01-01",
end_date="2024-01-01",
adjusted=True # Corporate action adjusted
)
# Calculate returns
returns = pipeline.query.get_returns(
symbols=symbols,
start_date="2023-01-01",
frequency="daily"
)
# Get point-in-time data (prevents lookahead bias)
pit_data = pipeline.query.get_point_in_time(
symbols=symbols,
as_of_date="2023-06-15",
lookback_days=252
)The validation pipeline performs multiple checks on ingested data:
Identifies missing trading days and provides multiple interpolation methods:
from src.validation import GapDetector
detector = GapDetector(
max_gap_days=5, # Flag gaps longer than 5 days
)
gaps = detector.detect(df)
filled_df = detector.fill(df, method="forward") # or "linear", "interpolate"Statistical methods for identifying anomalous price movements:
from src.validation import OutlierDetector
detector = OutlierDetector(
method="zscore", # or "iqr", "isolation_forest"
threshold=4.0,
)
outliers = detector.detect(df)
cleaned_df = detector.clean(df, method="clip") # or "remove"Handles splits and dividends:
from src.validation import CorporateActionAdjuster
adjuster = CorporateActionAdjuster()
# Adjust historical prices for splits and dividends
adjusted_df = adjuster.adjust(df)from src.validation import QualityScorer
scorer = QualityScorer()
report = scorer.generate_report(df)
print(f"Completeness: {report['scores']['completeness']:.2%}")
print(f"Consistency: {report['scores']['consistency']:.2%}")
print(f"Overall Score: {report['scores']['overall']:.2%}")Data is stored efficiently using DuckDB (default) or Parquet files:
DuckDB Backend (default):
data/
└── data.duckdb # Single database file containing:
# - ohlcv table (validated data)
# - raw_data table (ingested data)
# - snapshots table (point-in-time data)
Parquet Backend:
data/
└── parquet/
└── ohlcv/ # Partitioned by year/month
└── year=2023/
└── month=01/
└── data.parquet
# Get universe of stocks meeting criteria
universe = pipeline.query.screen(
min_price=5.0,
min_avg_volume=1_000_000,
as_of_date="2023-12-31"
)
# Calculate rolling volatility
vol = pipeline.query.rolling_volatility(
symbols=universe,
window=20,
annualize=True
)
# Get correlation matrix
corr = pipeline.query.correlation_matrix(
symbols=universe[:50], # First 50 stocks from screened universe
start_date="2023-01-01",
end_date="2023-12-31"
)
# Get summary statistics (returns, volatility, Sharpe ratio)
stats = pipeline.query.summary_statistics(
symbols=["AAPL", "GOOGL", "MSFT"],
start_date="2023-01-01",
end_date="2023-12-31"
)
# Custom SQL query (DuckDB backend only)
custom_df = pipeline.query.sql("""
SELECT
symbol,
date,
close,
volume,
close / LAG(close) OVER (PARTITION BY symbol ORDER BY date) - 1 as return
FROM ohlcv
WHERE date >= '2023-01-01'
AND symbol IN ('AAPL', 'GOOGL', 'MSFT')
ORDER BY symbol, date
""")The pipeline is optimized for performance using:
- Polars for fast DataFrame operations (often 5-10x faster than pandas)
- DuckDB for analytical queries with columnar storage
- Vectorized operations for calculations (returns, volatility, etc.)
- Lazy evaluation where possible
Expected performance characteristics:
- OHLCV queries: Sub-second for most date ranges
- Returns calculation: Vectorized operations on millions of rows
- Correlation matrices: Efficient computation using numpy/polars
- Data validation: Parallel processing of validation rules
quant-data-pipeline/
├── src/
│ ├── __init__.py
│ ├── pipeline.py # Main pipeline orchestration
│ ├── ingestion/
│ │ ├── __init__.py
│ │ ├── base.py # Abstract base source
│ │ ├── yahoo.py # Yahoo Finance adapter
│ │ ├── polygon.py # Polygon.io adapter
│ │ └── csv_source.py # CSV/Parquet file source
│ ├── validation/
│ │ ├── __init__.py
│ │ ├── gaps.py # Gap detection and filling
│ │ ├── outliers.py # Outlier detection
│ │ ├── corporate_actions.py # Split/dividend adjustments
│ │ ├── quality.py # Data quality scoring
│ │ └── pipeline.py # Validation pipeline orchestration
│ ├── storage/
│ │ ├── __init__.py
│ │ ├── base.py # Storage interface
│ │ ├── duckdb_store.py # DuckDB backend
│ │ └── parquet_store.py # Parquet file backend
│ ├── query/
│ │ ├── __init__.py
│ │ └── research_api.py # High-level research queries
│ └── utils/
│ ├── __init__.py
│ └── config.py # Configuration management
├── tests/
│ ├── test_ingestion.py
│ ├── test_validation.py
│ ├── test_storage.py
│ └── test_query.py
├── config/
│ └── config.example.yaml
├── scripts/
│ └── ingest_daily.py # Daily ingestion script
├── notebooks/
│ └── examples.py # Usage examples (jupytext format)
├── requirements.txt
├── pyproject.toml
├── setup.py
└── README.md
# Run all tests
pytest tests/ -v
# Run with coverage
pytest tests/ --cov=src --cov-report=html
# Run specific test module
pytest tests/test_validation.py -v- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.