Real-time streaming ML pipeline with monitoring
A comprehensive MLOps project for real-time cryptocurrency price prediction using Kafka, Kind, Kubernetes, uv, and Quix Streams. The project is structured with separate development and production environments, with the production cluster deployed on Civo Cloud.
mlops-llm-crypto-predictor/
├── dashboards/ # Grafana dashboards
├── deployments/ # Kubernetes manifests
│ ├── dev/ # Development environment configurations
│ │ ├── candles/ # Candles service K8s manifests
│ │ ├── kind/ # Kind cluster configuration
│ │ │ ├── manifests/ # Kafka, MLflow, etc. manifests
│ │ │ └── scripts/ # Cluster setup scripts
│ │ ├── prediction-generator/ # Prediction service K8s manifests
│ │ ├── technical-indicators/ # Technical indicators K8s manifests
│ │ └── trades/ # Trades service K8s manifests
│ └── prod/ # Production environment configurations
│ ├── candles/ # Candles service K8s manifests
│ ├── prediction-generator/ # Prediction service K8s manifests
│ ├── technical-indicators/ # Technical indicators K8s manifests
│ └── trades/ # Trades service K8s manifests
├── docker/ # Docker-related files and documentation
├── images/ # Documentation images
├── services/ # Microservices
│ ├── candles/ # Candles aggregation service
│ ├── predictor/ # ML prediction service
│ ├── technical_indicators/ # Technical analysis service
│ └── trades/ # Trades data ingestion service
├── .dockerignore # Docker ignore file
├── .gitignore # Git ignore file
├── .pre-commit-config.yaml # Pre-commit configuration
├── LICENSE # Project license
├── Makefile # Project automation commands
├── pyproject.toml # Project dependencies and configuration
└── README.md # Project documentation
Initialize the project from the root directory to create the main pyproject.toml file:
uv initCreate a trades workspace in the services directory:
cd services
uv init --lib tradesThis creates a pyproject.toml file in the trades workspace with the src layout, includes the hatchling build-system, and adds the trades workspace to the main pyproject.toml file.
Each service is treated as a separate workspace using the [tool.uv.workspace] section to define all workspace members.
To declare a workspace member in the main pyproject.toml:
[tool.uv.workspace]
members = ["services/trades"]To add a workspace as a dependency:
uv add tradesThis command:
- Adds trades to the dependencies list
- Adds an entry to the
[tool.uv.sources]section inpyproject.toml, marking it as a workspace dependency:
[tool.uv.sources]
trades = { workspace = true }Add dependencies to the main project:
uv add quixstreamsAdd development dependencies using dependency groups:
uv add --group tests pytestTA-Lib is required for technical analysis of financial data. Install it before adding it to the project:
- Follow the installation instructions at ta-lib.org
- Add the package to the project:
uv add ta-libSynchronize the project without installing dependency groups:
uv syncSynchronize the project and install all dependency groups:
uv sync --all-groupsCreate a Kind cluster with port mapping for Kafka:
chmod +x deployments/dev/kind/create_cluster.sh
./deployments/dev/kind/create_cluster.shThe deployments/dev/kind directory contains:
kind-with-portmapping.yaml: Kind configuration with port mapping for Kafkamanifests/: Kafka configuration fileskafka-e11b.yaml: Creates 2 Kafka pods (dual-roleandentity-operator)kafka-ui-all-in-one.yaml: Creates the Kafka UI pod
install_kafka.sh: Installs Kafka using Strimziinstall_kafka_ui.sh: Installs Kafka UIcreate_cluster.sh: Runs all scripts to create the Kind cluster with Kafka and Kafka UI
After running the script, you should see the cluster in the k9s terminal:
Check services and port mapping for Kafka and Kafka UI:
Forward the Kafka UI port to access it from your local machine:
kubectl -n kafka port-forward svc/kafka-ui 8182:8080For background port forwarding using tmux:
tmux new-session -d 'kubectl -n kafka port-forward svc/kafka-ui 8182:8080'Access the Kafka UI at http://localhost:8182.
Test the Kafka broker connection:
nc -vvv localhost 31234The trades service connects to the Kraken API to get real-time trade data and produces it to the Kafka topic trades:
uv run services/trades/src/trades/main.pyYou should see trade data in both the terminal and Kafka UI:
By default, the number of partitions is 1. If you change it to 2, trade data will be distributed between both partitions. Only deploy the candles service with 2 replicas when both partitions have values.
Note: Changing partitions after deploying the candles service will cause errors. You'll need to delete and redeploy the candles service with the new partition count.
RisingWave is a streaming database for real-time analytics. Install it using:
chmod +x deployments/dev/kind/install_risingwave.sh
./deployments/dev/kind/install_risingwave.shAccess the RisingWave database:
kubectl port-forward svc/risingwave -n risingwave 4567:4567
psql -h localhost -p 4567 -d dev -U rootCreate a table in RisingWave to connect to Kafka. Example from query.sql:
CREATE SOURCE my_kafka_source (
user_id INT,
product_id VARCHAR,
timestamp TIMESTAMP
) WITH (
connector='kafka',
topic='user_activity',
properties.bootstrap.server='broker1:9092,broker2:9092'
) FORMAT PLAIN ENCODE JSON;Query the technical indicators table:
SELECT * FROM technical_indicators LIMIT 10;
SELECT COUNT(*) FROM technical_indicators;
SELECT pair, COUNT(*) FROM technical_indicators GROUP BY pair;Minio stores the runtime state of streaming jobs. Forward the Minio UI port:
kubectl port-forward -n risingwave svc/risingwave-minio 9001:9001Login with credentials from risingwave-values.yaml:
- Username: admin
- Password: minio-D0408AC0
Create access keys and add them to mlflow-minio-secret.yaml:
apiVersion: v1
kind: Secret
metadata:
name: mlflow-minio-secret
namespace: mlflow
type: Opaque
stringData:
AccessKeyID: YOUR_ACCESS_KEY_ID
SecretKey: YOUR_SECRET_KEYApply the secret:
kubectl create namespace mlflow
kubectl apply -f deployments/dev/kind/manifests/mlflow-minio-secret.yamlhelm uninstall risingwave -n risingwaveInstall Grafana for data visualization:
chmod +x deployments/dev/kind/install_grafana.sh
./deployments/dev/kind/install_grafana.shAccess the Grafana UI:
kubectl port-forward -n monitoring svc/grafana 3000:80Login with credentials from grafana-values.yaml:
- Username: admin
- Password: grafana
Add a PostgreSQL data source with these settings:
- Name: grafana-postgresql-datasource
- Host URL: risingwave.risingwave.svc.cluster.local:4567
- Database: dev
- User: root
- TLS/SSL Mode: disable
Create a dashboard with SQL queries like:
SELECT open, high, low, close, pair, window_start_ms, window_end_ms, to_timestamp(window_end_ms/1000) as time FROM technical_indicators LIMIT 10;SELECT open, high, low, close, pair, window_start_ms, window_end_ms, to_timestamp(window_end_ms/1000) as time FROM technical_indicators WHERE pair = 'ETH/EUR' ORDER BY window_end_ms DESC LIMIT 10;Export the dashboard JSON model from Settings and save it in the dashboards folder for future imports.
Uninstall Grafana:
helm uninstall grafana -n monitoringCreate the database and user in RisingWave PostgreSQL:
kubectl exec -it -n risingwave risingwave-postgresql-0 -- bash
psql -U postgres -h risingwave-postgresql.risingwave.svc.cluster.local # password: postgresCreate the database and user:
CREATE USER mlflow WITH ENCRYPTED password 'mlflow';
CREATE DATABASE mlflow WITH ENCODING='UTF8' OWNER=mlflow;
CREATE DATABASE mlflow_auth WITH ENCODING='UTF8' OWNER=mlflow;Create the MLflow tracking secret:
kubectl apply -f deployments/dev/kind/manifests/mlflow-tracking-secret.yamlOr create it manually:
kubectl create secret generic mlflow-tracking \
--from-literal=admin-user='your-user' \
--from-literal=admin-password='your-password' \
--namespace=mlflowInstall MLflow:
helm upgrade --install --create-namespace --wait mlflow oci://registry-1.docker.io/bitnamicharts/mlflow --namespace=mlflow --values deployments/dev/kind/manifests/mlflow-values.yamlGet the MLflow credentials:
kubectl get secret --namespace mlflow mlflow-tracking -o jsonpath="{.data.admin-user}" | base64 -d
kubectl get secrets -n mlflow mlflow-tracking -o json | jq -r '.data."admin-password"' | base64 -dUninstall MLflow:
helm uninstall mlflow -n mlflowNote: To run the train.py script in the predictor service, you need to port-forward both the MLflow UI and RisingWave UI.
The training pipeline is a Kubernetes CronJob that runs the train.py script in the predictor service:
make cron-kustomize-trainingCheck the training pipeline:
kubectl get cronjobs -n rwml
kubectl describe cronjobs -n rwmlDeploy the prediction generator service:
make cron-kustomize-predictionThis service reads from the technical indicators table and generates predictions using the trained model.
The project includes a comprehensive Makefile with useful commands:
make start-kind-cluster # Start the Kind cluster with port mapping
make stop-kind-cluster # Stop the Kind clustermake dev service=trades # Run a specific service in development mode
make build-for-dev service=trades # Build a service's Docker image for development
make push-for-dev service=trades # Push a service's Docker image to the Kind cluster
make deploy-for-dev service=trades # Deploy a service to the Kind clusterVerify deployments:
kubectl get deployments --all-namespacesmake tmux-port-forward-mlflow # Port forward MLflow UI with tmuxmake ruff # Run Ruff linter with auto-fix
make mypy # Run MyPy static type checker
make clean # Clean up cached files and build artifacts
make all # Run ruff, mypy, and clean in sequencemake help # Display all available make commands with descriptionsThe project uses pre-commit hooks for code quality and consistency:
pre-commit install # Install pre-commit hooks
pre-commit autoupdate # Update hooks to latest versions
pre-commit run --all-files # Run hooks manuallyCheck resource usage:
kubectl get pods --all-namespaces -o custom-columns="NAMESPACE:.metadata.namespace,NAME:.metadata.name,CPU_REQUEST:.spec.containers[*].resources.requests.cpu,MEMORY_REQUEST:.spec.containers[*].resources.requests.memory,CPU_LIMIT:.spec.containers[*].resources.limits.cpu,MEMORY_LIMIT:.spec.containers[*].resources.limits.memory"Resource requirements:
- CPU (Requests/Limits): ~2.2 vCPU / ~6-7 vCPU
- Memory (Requests/Limits): ~4.5-5Gi / ~10-11Gi
Install Metrics Server:
kubectl apply -f https://github.com/kubernetes-sigs/metrics-server/releases/download/v0.7.2/components.yamlModify the Metrics Server deployment to add --kubelet-insecure-tls to the args.
Check CPU and memory usage:
kubectl top nodes
kubectl top pods --all-namespacesChoose at least:
- 1 node with 3-4 vCPU and 7-8 GiB RAM, or
- 2 nodes with 2 vCPU and 4 GiB RAM each for redundancy
Get Civo instance sizes:
curl -s -H "Authorization: bearer <your-api-key>" https://api.civo.com/v2/sizes | jq -r '[.[] | select(.selectable == true)] | ["Name","Type","CPU","RAM (MiB)","Disk (GB)"], (.[] | [.name, .type, (.cpu_cores|tostring), (.ram_mb|tostring), (.disk_gb|tostring)]) | @tsv' | column -tRecommended Civo plans:
- One node:
g4s.kube.large(4 vCPU / 8GiB RAM) - $21.75/month - Two nodes:
g4s.kube.medium(2 vCPU / 4GiB RAM) - $43.50/month
Switch to the Civo cluster:
export KUBECONFIG=~/.kube/config:~/.kube/civo-crypto-cluster-kubeconfig
kubectl config get-contexts
kubectl config use-context <your-civo-context-name>Create a namespace:
kubectl create namespace benAuthenticate to GitHub Container Registry:
kubectl create secret docker-registry ghcr-creds \
--docker-server=https://ghcr.io \
--docker-username=GITHUB_USERNAME \
--docker-password=YOUR_GITHUB_PAT \
--namespace=benApply Kafka configuration:
kubectl apply -f deployments/prod/kind/install_kafka_prod.sh
kubectl apply -f deployments/prod/kind/install_kafka_ui_prod.shAdd the NodePort to Civo Firewall Inbound Rules and restrict it to your CIDR.
Deploy services:
kubectl apply -f deployments/prod/trades/trades.yaml
kubectl apply -f deployments/prod/candles/candles.yaml
kubectl apply -f deployments/prod/technical-indicators/technical-indicators.yamlAccess the Kafka UI at http://<your-civo-cluster-ip>:<NodePort>.
If pods fail with CreateContainerConfigError, check:
- Image availability:
docker exec rwml-34fa-control-plane crictl images | grep <service-name> - ConfigMap and Secret references
- Environment variables
If services can't connect to RisingWave:
- Check if RisingWave is running:
kubectl get pods -n risingwave - Verify connection details in ConfigMaps
- Port-forward for local testing:
kubectl port-forward svc/risingwave -n risingwave 4567:4567
To restart a service after Docker restarts:
kubectl rollout restart deployment <service-name> -n <namespace>Or delete the pod to trigger automatic recreation:
kubectl delete pod <pod-name> -n <namespace>This project is licensed under the MIT License - see the LICENSE file for details.














