Skip to content

zerotrustprivacy/GCP-pipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

9 Commits
 
 
 
 
 
 
 
 

Repository files navigation

Real-Time Patient Telemetry Streaming on Google Cloud

Overview

This project implements a real-time data pipeline on Google Cloud Platform (GCP). It continuously ingests simulated patient telemetry messages via Pub/Sub, processes and aggregates them in a streaming Dataflow job (Apache Beam), and writes results to BigQuery for immediate analysis.

  • Language: Python
  • Streaming framework: Apache Beam on Dataflow
  • Ingestion: Cloud Pub/Sub
  • Storage/Analytics: BigQuery

Architecture

The pipeline processes messages from Pub/Sub, performs per-patient aggregations in fixed windows, and writes aggregated results to BigQuery.

Architecture Diagram

Repository contents

  • streaming_pipeline.py — Beam pipeline that reads from Pub/Sub, aggregates metrics, and writes to BigQuery.
  • healthcare_pipeline.py — Additional example pipeline (see source for details).
  • telemetry_schema.json — Example schema reference for telemetry (optional usage depending on your flow).
  • README.md — This guide.

Prerequisites

  • A Google Cloud Project with billing enabled
  • Python 3.8+ and pip
  • Google Cloud SDK (gcloud) authenticated and configured
  • IAM permissions to create Pub/Sub, BigQuery, and GCS resources, and to run Dataflow jobs

Recommended roles for the job runner/service account:

  • Dataflow Admin
  • Pub/Sub Subscriber (for the subscription)
  • BigQuery Data Editor (on the target dataset)
  • Storage Object Admin (on the staging/temp buckets)

Enable required APIs

gcloud services enable \
  dataflow.googleapis.com \
  pubsub.googleapis.com \
  bigquery.googleapis.com \
  storage-component.googleapis.com

Install dependencies

pip install --upgrade pip
pip install "apache-beam[gcp]" google-cloud-pubsub google-cloud-bigquery pandas

Configure environment

export PROJECT_ID="YOUR_GCP_PROJECT_ID"
export REGION="us-central1" # or your preferred region
gcloud config set project "$PROJECT_ID"

Provision GCP resources

  1. Create GCS buckets for Dataflow staging and temp files:
export BUCKET="${PROJECT_ID}-dataflow"
gsutil mb -l "$REGION" "gs://${BUCKET}"
  1. Create Pub/Sub topic and subscription:
gcloud pubsub topics create telemetry-data-stream --project="$PROJECT_ID"

gcloud pubsub subscriptions create telemetry-data-stream-sub \
  --topic=telemetry-data-stream \
  --ack-deadline=600 \
  --message-retention-duration=7d \
  --project="$PROJECT_ID"
  1. Create a BigQuery dataset for outputs:
bq --location="$REGION" mk -d "${PROJECT_ID}:telemetry_analytics"

Run the streaming pipeline on Dataflow

The pipeline expects JSON messages with at least patient_id and heart_rate fields. It aggregates heart rates per patient in 1-minute fixed windows and writes windowed results to BigQuery.

Example run:

python streaming_pipeline.py \
  --project "$PROJECT_ID" \
  --region "$REGION" \
  --input_subscription "telemetry-data-stream-sub" \
  --output_table "${PROJECT_ID}:telemetry_analytics.heart_rate_windowed_stats" \
  --temp_location "gs://${BUCKET}/tmp" \
  --staging_location "gs://${BUCKET}/staging" \
  --job_name "telemetry-streaming-job"

Notes:

  • The pipeline is configured to use DataflowRunner.
  • Ensure the region you choose is supported by Dataflow, Pub/Sub, and BigQuery.
  • The output table will be created if it doesn’t exist (schema autodetect is enabled for the write).

Publish test messages

Use the gcloud CLI to publish sample messages to the topic:

gcloud pubsub topics publish telemetry-data-stream \
  --message='{"patient_id": 101, "heart_rate": 82}'
gcloud pubsub topics publish telemetry-data-stream \
  --message='{"patient_id": 101, "heart_rate": 90}'
gcloud pubsub topics publish telemetry-data-stream \
  --message='{"patient_id": 202, "heart_rate": 76}'

Message expectations:

  • JSON-encoded string
  • Contains patient_id (string or integer) and heart_rate (number)

BigQuery output schema

The pipeline writes aggregated rows with the following fields:

  • start_time (STRING/RFC3339) — start of the 1-minute window
  • end_time (STRING/RFC3339) — end of the 1-minute window
  • patient_id (INTEGER/STRING) — patient identifier
  • avg_heart_rate (FLOAT) — average heart rate over the window
  • min_heart_rate (INTEGER) — minimum heart rate over the window
  • max_heart_rate (INTEGER) — maximum heart rate over the window
  • num_readings (INTEGER) — number of readings in the window

Because schema autodetect is used, BigQuery will infer types on first write. For stricter control, create the table up front with an explicit schema and point the pipeline to it.

Example analysis query

Average heart rate per patient in the most recent hour:

SELECT
  patient_id,
  AVG(avg_heart_rate) AS hourly_avg_heart_rate,
  SUM(num_readings)   AS total_readings
FROM `YOUR_GCP_PROJECT_ID.telemetry_analytics.heart_rate_windowed_stats`
WHERE TIMESTAMP(start_time) >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
GROUP BY patient_id
ORDER BY total_readings DESC;

Troubleshooting

  • Permissions: Ensure the Dataflow worker service account can access Pub/Sub, GCS, and BigQuery as noted above.
  • Region mismatches: Keep Dataflow, Pub/Sub, BigQuery dataset, and GCS buckets in compatible regions.
  • Table naming: For --output_table, prefer the fully qualified format PROJECT_ID:DATASET.TABLE.
  • Message shape: Non-JSON or missing fields will be skipped; ensure messages include patient_id and heart_rate.
  • Windowing: This pipeline uses processing-time fixed windows (60s). If you need event-time semantics, add timestamps to messages and configure Beam with appropriate timestamp extraction and allowed lateness.

Cleaning up

To avoid ongoing charges:

  • Cancel the Dataflow job once testing is complete.
  • Delete the Pub/Sub subscription and topic if no longer needed.
  • Remove the BigQuery dataset/table and GCS bucket if appropriate.
gcloud pubsub subscriptions delete telemetry-data-stream-sub --project="$PROJECT_ID"
gcloud pubsub topics delete telemetry-data-stream --project="$PROJECT_ID"
bq rm -r -f -d "${PROJECT_ID}:telemetry_analytics"
gsutil rm -r "gs://${BUCKET}"

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages