Pipeline de datos para procesamiento de información agrícola en AWS.
Objetivo: Ingestar un CSV "rinde_lotes.csv" y "clima_diario.csv" a un bucket S3/curated en Parquet particionado por campaña y lote. Validar: rangos de rinde, % nulos, consistencia de fechas; exponer una vista para BI (Athena).
S3 Landing → Step Functions → Glue Jobs → S3 Curated → Crawlers → Glue Catalog → Athena ↓ Data Quality (Great Expectations) ↓ Resultados en S3 (dq_results/)
- Ingesta: CSV → S3 Landing
- Procesamiento: AWS Glue (PySpark)
- Orquestación: AWS Step Functions
- Catálogo: AWS Glue Data Catalog
- Data Quality: Great Expectations
- Consumo: Amazon Athena
agro_data/
├── .github/workflows/ # CI/CD
├── infra/ # Terraform
├── src/ # Código fuente
│ ├── ingestion/ # Jobs de Glue
│ └── dq/ # Data Quality
└── orchestration/ # Step Functions
- Buckets S3: landing, curated, scripts
- Roles IAM con mínimo privilegio
- Jobs de Glue (PySpark)
- Crawlers para actualizar catálogo
- Step Functions para orquestación
- Base de datos en Glue Catalog
- Lectura de CSVs desde landing
- Validaciones de rango (rinde 0-20000, temp -20-50, precip 0-500)
- Control de nulos en columnas críticas
- Escritura en formato Parquet particionado (campaña/lote)
- Suite de validaciones para rinde_lotes
- Suite de validaciones para clima_diario
- Resultados almacenados en S3 (dq_results/)
- Reintentos automáticos para sincronización de catálogo
- Flujo secuencial: Rinde → Clima → Crawlers → DQ Rinde → DQ Clima
- Manejo de errores y reintentos
- Próxima mejora: Ejecución programada (CloudWatch Events)
Se proveen los archivos de configuración de perfiles apropiados para los perfiles: (carpeta iam)
- Perfil administrador (terraform apply)
- Perfil ingestión (solo escritura a landing)
- Perfil BI (solo lectura a curated y Athena)
Es posible monitorear mediante:
- Logs en CloudWatch
- Métricas de ejecución
- Resultados DQ visibles en S3
- S3 (50GB) almacenamiento y operaciones: $1.15
- Glue (2 jobs x 10min/día) 2DPU: $8.40
- Step Functions (30 ejecuciones): $1.00
- Athena (10GB escaneados): $0.50
- Total: ~usd 11.40/mes
"Para escenarios de menor volumen (<1GB), podríamos reemplazar Glue Spark por Pandas en AWS Lambda"
| Cambio | Impacto | Ahorro |
|---|---|---|
| Spark (2 DPU) → Pandas (Lambda 1GB) | +3s de latencia | -65% |
| Glue Spark Jobs → Lambda (128MB) | Procesamiento batch a demanda | -$5.50/mes |
| Total optimizado | ~$5.90/mes |
Para escalar a terabytes y reducir latencia, optimizamos la configuración de Spark y el particionado
| Cambio | Impacto | Costo adicional |
|---|---|---|
| Aumentar workers (2 → 5) | -40% tiempo procesamiento | +120% |
| Particionado por fecha+hora | Consultas 3x más rápidas | +15% (más archivos) |
| Usar Glue Workflows | Pipeline optimizado | Sin costo extra |
| Total optimizado | 60% más rápido | +35% ($15.40/mes) |
- AWS CLI configurado
- Terraform >= 1.0
- Python 3.9+
# 1. Clonar repositorio
git clone agro_data
cd agro_data
# Instalar dependencias
pip install -r requirements-dev.txt
# Desplegar infraestructura
cd infra && terraform apply
# Subir scripts necesarios para el funcionamiento
./scripts/upload_scripts.sh
# Subir datos de ejemplo de la carpeta /data
./scripts/upload_data.sh# Ejecutar pipeline (luego de que la infra está lista)
./scripts/run_step_function.sh
# Ver resultados DQ
aws s3 ls s3://agro-data-pipeline-dev-curated/dq_results/ --recursive# Consultar en Athena (en su consola de queries)
SELECT * FROM 'agro-data-pipeline_dev_db'.'rinde_lotes' LIMIT 10;
# Realizar una consulta SQL desde la cli de aws
aws athena start-query-execution \
--query-string "SELECT * FROM 'agro-data-pipeline_dev_db'.'rinde_lotes' LIMIT 10;" \
--result-configuration "OutputLocation=s3://agro-data-pipeline-dev-curated/athena-results/" \
--output text \
--query 'QueryExecutionId'
# Ver archivos Parquet generados
aws s3 ls s3://agro-data-pipeline-dev-curated/rinde_lotes/ --recursive
aws s3 ls s3://agro-data-pipeline-dev-curated/clima_diario/ --recursiveLos jobs son idempotentes porque:
- Sobrescriben particiones con mode("overwrite")
- Procesan archivo por archivo con timestamp en nombre
- Si el mismo archivo se procesa dos veces → mismo resultado
Great Expectations valida:
- No nulos en columnas críticas
- Rangos de rinde (0-20000)
- Rangos climáticos (temp -20/50, precip 0-500)
- Formato de fechas YYYY-MM-DD
- Athena: Consultas SQL directas
$ pytest tests/unit -v
======================================== test session starts ========================================
platform linux -- Python 3.11.2, pytest-7.4.0, pluggy-1.6.0
collected 14 items
tests/unit/test_data_samples.py::test_lectura_rinde_csv ✓ [ 7%]
tests/unit/test_data_samples.py::test_lectura_clima_csv ✓ [14%]
tests/unit/test_data_samples.py::test_filas_invalidas_rinde ✓ [21%]
tests/unit/test_validators.py::TestRindeValidator::test_rinde_valido ✓ [28%]
tests/unit/test_validators.py::TestRindeValidator::test_rinde_invalido ✓ [35%]
tests/unit/test_validators.py::TestRindeValidator::test_rinde_limites ✓ [42%]
tests/unit/test_validators.py::TestTemperaturaValidator::test_temperatura_valida ✓ [50%]
tests/unit/test_validators.py::TestTemperaturaValidator::test_temperatura_invalida ✓ [57%]
tests/unit/test_validators.py::TestPrecipitacionValidator::test_precipitacion_valida ✓ [64%]
tests/unit/test_validators.py::TestPrecipitacionValidator::test_precipitacion_invalida ✓ [71%]
tests/unit/test_validators.py::TestFechaValidator::test_fecha_valida ✓ [78%]
tests/unit/test_validators.py::TestFechaValidator::test_fecha_invalida ✓ [85%]
tests/unit/test_validators.py::TestNotNullValidator::test_not_null_valido ✓ [92%]
tests/unit/test_validators.py::TestNotNullValidator::test_not_null_invalido ✓ [100%]
===================================== 14 passed in 0.06s =====================================




