A production-grade, event-driven pipeline that extracts structured data from PDF claim forms using Gemini 2.5 Flash (Vertex AI), with multi-pass validation, error recovery, and multi-environment support. Deployed as two Cloud Run microservices.
BigQuery (pending claims)
│
▼
┌──────────────────┐
│ Recon Handler │── fetches PDFs from Google Drive ──► GCS Bucket
└──────────────────┘ │
(GCS Cloud Event)
│
▼
┌───────────────────────┐
│ Extraction Service │
│ (Gemini 2.5 Flash) │
└────────────┬──────────┘
│
┌────────────────┼────────────────┐
▼ ▼ ▼
AI_CLAIM_ AI_CLAIM_ AI_CLAIM_
MASTER ACTIVITIES DETAILS
(BigQuery) (BigQuery) (BigQuery)
Orchestrates the intake pipeline:
- Queries BigQuery for pending/failed claim documents
- Downloads PDFs from Google Drive (individual files or merged from folders, excluding excluded document types)
- Uploads PDFs to GCS, triggering the extraction service via Cloud Events
- Marks rows as
PROCESSINGin BigQuery (direct SQL or Sheets API depending on environment)
Processes individual PDFs on GCS upload events:
- Uses Gemini 2.5 Flash to extract structured data in three passes:
- Header: Claim ID, Account Name/Number, Promo Dates, Total Cost
- Activities: Brand, Category, Charge Code, Amount, Approver
- Supporting Docs: Invoice/Debit Note/Credit Note transaction records
- Smart recovery passes: Re-runs targeted prompts if any section is empty after the initial extraction
- Refinement steps: E-invoice reclassification, supplier name correction, deduplication
- Validation: Compares extracted totals against BigQuery source records; retries up to 3× on mismatch
- Writes results to three BigQuery tables and updates source record status
- Multi-pass extraction with independent recovery prompts per section (header / activities / details)
- Typed JSON schema enforcement via Gemini's structured output mode
- Validation with error codes (
ERR001–ERR003) and automatic retry on failure - Multi-environment support (
DEV/TEST/PROD) with different project IDs, table patterns, and update methods - Google Drive integration: handles both direct file links and folder-merged PDFs
- Sheets API status writeback for test environments
- Cloud Run + Cloud Events: fully serverless, scales to zero
- Python 3.11,
functions-framework - Google Vertex AI — Gemini 2.5 Flash
- Google Cloud Run, Cloud Storage, BigQuery
- Google Drive API, Google Sheets API
- Google OAuth2, PyPDF2
├── extraction-service/
│ ├── main.py # Cloud Event handler, Gemini prompts, extraction + validation logic
│ ├── tools/
│ │ ├── bq.py # BigQuery read/write + environment config
│ │ └── sql.py # SQL value formatting helpers
│ ├── Dockerfile
│ └── requirements.txt
├── recon-handler/
│ ├── main.py # Cloud Event handler, Drive download, GCS upload
│ ├── Dockerfile
│ └── requirements.txt
├── .env.example
└── .gcloudignore
Both services read the ENV environment variable (DEV / TEST / PROD) to select their GCP project, BigQuery table pattern, and status update method:
| Env | Project | Table Pattern | Status Update |
|---|---|---|---|
DEV |
your-gcp-project-dev |
CLAIM_{type}_V1 |
Direct SQL |
TEST |
your-gcp-project-prod |
AI_UAT_CLAIM_{type} |
Sheets API |
PROD |
your-gcp-project-prod |
View read / Table write | Direct SQL |
| Variable | Description |
|---|---|
ENV |
DEV, TEST, or PROD |
CLIENT_ID |
Google OAuth2 Client ID |
CLIENT_SECRET |
Google OAuth2 Client Secret |
REFRESH_TOKEN |
Google OAuth2 Refresh Token |
DATASET_ID |
BigQuery dataset (default: CLAIM_DATASET) |
GEMINI_MODEL |
Gemini model name (default: gemini-2.5-flash) |
SPREADSHEET_ID |
Google Sheet ID (TEST env only) |
After extraction, totals are validated against source BigQuery records:
| Code | Description |
|---|---|
ERR001 |
Account name mismatch |
ERR002 |
Total cost mismatch (sum of details vs. expected) |
ERR003 |
Claim ID mismatch or healed from filename |
Failed validations trigger up to 3 full re-extraction attempts.
# Extraction service
cd extraction-service
gcloud run deploy extraction-service \
--source . \
--region asia-southeast1 \
--set-env-vars ENV=PROD,CLIENT_ID=...,CLIENT_SECRET=...,REFRESH_TOKEN=...
# Recon handler
cd ../recon-handler
gcloud run deploy recon-handler \
--source . \
--region asia-southeast1 \
--set-env-vars ENV=PROD,CLIENT_ID=...,CLIENT_SECRET=...,REFRESH_TOKEN=...MIT