Skip to content

ninajafli/threatflow

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

ThreatFlow

Real-time and batch threat-analytics platform on GCP. Synthetic network-flow events flow through Confluent Kafka, get processed two ways (streaming Spark for live aggregates, batch MapReduce + PySpark on Dataproc for historical analysis), and render in a Streamlit dashboard.

Data flow

Real-time path: producer publishes JSON events at ~100/sec to the threatflow-flows topic. A Spark Structured Streaming job consumes them, computes per-minute counts grouped by attack label, and writes the updates to threatflow-aggregates. The Streamlit dashboard subscribes and renders.

Batch path: a 200k-row CSV lives in GCS. scripts/run-batch.sh spins up a single-node Dataproc cluster, runs two Python MapReduce jobs (top-K destination ports per hour, and an inverted index of attackers per port), runs the equivalent PySpark job, copies the input into HDFS via DistCp, and tears the cluster down.

Architecture at a glance

                                ┌───────────────────────────────────┐
                                │   GKE cluster: threatflow-gke     │
   ┌─────────────┐              │                                   │
   │  producer   │──events──►   │   Kafka (threatflow-flows)        │
   │ (Python)    │              │       │                           │
   └─────────────┘              │       ▼                           │
                                │   Spark Streaming  ──aggregates── │
                                │       │                           │
                                │       ▼                           │
                                │   Kafka (threatflow-aggregates)   │
                                │       │                           │
                                │       ▼                           │
                                │   Streamlit dashboard             │
                                │                                   │
                                │   Control Center, Prometheus,     │
                                │   Grafana, SonarQube              │
                                └───────────────────────────────────┘

   ┌─────────────┐    ┌─────────────────────────────────────────┐
   │  GCS bucket │◄──►│  Dataproc (ephemeral, on-demand)        │
   │   archive   │    │   MapReduce top-K (Hadoop Streaming)    │
   │  CSV input  │    │   MapReduce inverted index              │
   │             │    │   PySpark batch + HDFS DistCp demo      │
   └─────────────┘    └─────────────────────────────────────────┘

Screenshots

Live dashboard:

Dashboard overview Dashboard time series

Confluent Control Center "Messages" tab on threatflow-flows:

Kafka Messages tab

SonarQube project overview (Quality Gate passed, A/A/A ratings):

SonarQube

Grafana cluster dashboard:

Grafana

Tech stack

GCP for everything: GKE for the streaming side, Dataproc for batch, GCS for archive, Artifact Registry for images. Provisioning is Terraform with state in GCS. The Confluent stack (Kafka, KRaft controller, Control Center) is deployed via the CFK operator on GKE. Stream processing is PySpark Structured Streaming in local[*] mode running inside a pod. The dashboard is Streamlit consuming Kafka directly with confluent-kafka-python. Observability is kube-prometheus-stack (Prometheus + Grafana + Alertmanager). Static analysis is SonarQube Community.

Quickstart

Provisioning takes about 10 minutes from a clean GCP project. The full stack to live dashboard is about 25 minutes.

Prerequisites:

brew install --cask google-cloud-sdk docker
brew install terraform kubectl helm
gcloud auth login
gcloud auth application-default login

First-time GCP setup (once per project):

gcloud projects create <project_id> --name="ThreatFlow"
gcloud config set project <project_id>
gcloud billing projects link <project_id> --billing-account=<billing_id>
gcloud services enable compute.googleapis.com container.googleapis.com \
  dataproc.googleapis.com storage.googleapis.com artifactregistry.googleapis.com \
  iam.googleapis.com monitoring.googleapis.com
gcloud storage buckets create gs://threatflow-tfstate-<suffix> \
  --location=us-central1 --uniform-bucket-level-access
gcloud storage buckets update gs://threatflow-tfstate-<suffix> --versioning

Then edit terraform/backend.tf to point at that state bucket.

Provision infrastructure:

cd terraform
cp terraform.tfvars.example terraform.tfvars   # edit project_id
terraform init
terraform apply

Wire kubectl:

gcloud container clusters get-credentials threatflow-gke \
  --zone us-central1-a --project <project_id>

Build + push images:

PROJECT_ID=<project_id> bash scripts/build-and-push.sh all

Deploy Confluent and the topics:

bash scripts/install-confluent.sh
kubectl apply -f k8s/confluent/20-topic-threatflow-flows.yaml
kubectl apply -f k8s/confluent/21-topic-threatflow-aggregates.yaml

Deploy the app stack:

for m in producer dashboard stream-processor; do
  PROJECT_ID=<project_id> envsubst < k8s/manifests/${m}-deployment.yaml \
    | kubectl apply -f -
done
kubectl -n confluent get pods -w

Run the batch jobs when you want them:

PROJECT_ID=<project_id> bash scripts/load-sample-data.sh
PROJECT_ID=<project_id> bash scripts/run-batch.sh

Inspecting

# Streamlit dashboard
kubectl -n confluent port-forward svc/dashboard 8501:8501

# Confluent Control Center "Messages" tab
kubectl -n confluent port-forward svc/controlcenter 9021:9021

# Grafana (admin / threatflow)
kubectl -n monitoring port-forward svc/kps-grafana 3000:80

# SonarQube (admin / admin)
kubectl -n cicd port-forward svc/sonarqube-sonarqube 9000:9000

Optional add-ons

bash scripts/install-monitoring.sh    # Prometheus + Grafana
bash scripts/install-sonarqube.sh     # SonarQube

Security

Cluster uses Workload Identity, private nodes (no public IPs), and a least-privilege node service account with three GCP roles (logs, metrics, Artifact Registry read). GCS archive bucket has uniform IAM and no public access.

SonarQube scan of the Python code came back clean: 0 bugs, 0 vulnerabilities, 0 code smells, A/A/A ratings, 38 hotspots all reviewed as false positives (mostly random.random() in the synthetic data generators, which isn't crypto).

Teardown

kubectl delete namespace confluent monitoring cicd --wait=false
bash scripts/teardown.sh
gcloud billing projects unlink <project_id>

Repo layout

threatflow/
├── README.md
├── sonar-project.properties      SonarQube scanner config
├── terraform/                    IaC: network, gke, gcs, artifact_registry
├── apps/                         producer, stream-processor, dashboard, batch-jobs
├── k8s/                          Confluent + monitoring + cicd manifests
└── scripts/                      install + build + run + teardown helpers

Dataset

CIC-IDS2018, academic use only, citation required. See LICENSE. The implementation uses a synthetic generator with the same schema; the real dataset is referenced for design but not redistributed.

About

Real-time and batch threat-analytics platform on GCP: Kafka, Spark Streaming, Dataproc, Terraform, and Kubernetes.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors