Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
173 changes: 173 additions & 0 deletions computing/misc/agriculture_census/gee_export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""
GEE Export for Agriculture Census

Takes the matched agriculture census data, joins it with SOI tehsil boundary
geometries, and publishes the enriched vector layer as an Earth Engine asset.

The output FeatureCollection has per-tehsil polygons with properties:
- crop_name, area_hectares (for each crop)
- matched_tehsil, match_score
- state, district, tehsil

This produces a tehsil-level vectorized crop map that can be used
in the Know Your Landscape dashboard and other downstream apps.
"""

import ee
import geopandas as gpd
import pandas as pd

from utilities.gee_utils import (
ee_initialize,
gdf_to_ee_fc,
export_vector_asset_to_gee,
check_task_status,
is_gee_asset_exists,
make_asset_public,
valid_gee_text,
get_gee_asset_path,
)
from computing.utils import (
sync_fc_to_geoserver,
save_layer_info_to_db,
update_layer_sync_status,
)
from nrm_app.celery import app


def enrich_tehsil_boundaries(matched_csv_path, boundary_geojson_path):
"""Join matched agriculture census data with SOI tehsil boundaries.

Args:
matched_csv_path: Path to matched agriculture census CSV
(output of pipeline.py with matched_tehsil column)
boundary_geojson_path: Path to SOI tehsil boundary GeoJSON

Returns:
GeoDataFrame with tehsil polygons enriched with crop attributes
"""
census_df = pd.read_csv(matched_csv_path)
boundaries_gdf = gpd.read_file(boundary_geojson_path)

# Standardize boundary column names
boundaries_gdf.columns = [
c.strip().lower().replace(" ", "_") for c in boundaries_gdf.columns
]

# Find tehsil name column in boundaries
tehsil_col = None
for col in boundaries_gdf.columns:
if "tehsil" in col.lower():
tehsil_col = col
break

if tehsil_col is None:
raise ValueError(
"Could not find tehsil column in boundary file. "
f"Available: {list(boundaries_gdf.columns)}"
)

# Filter to matched records only
matched = census_df[census_df["match_type"].isin(["exact", "fuzzy"])].copy()

if matched.empty:
print("No matched records found.")
return gpd.GeoDataFrame()

# Normalize for join
matched["_join_key"] = matched["matched_tehsil"].str.strip().str.lower()
boundaries_gdf["_join_key"] = boundaries_gdf[tehsil_col].str.strip().str.lower()

# Join
enriched = boundaries_gdf.merge(matched, on="_join_key", how="inner")
enriched = enriched.drop(columns=["_join_key"], errors="ignore")

# Ensure EPSG:4326
if enriched.crs is None:
enriched = enriched.set_crs("EPSG:4326")
elif enriched.crs.to_epsg() != 4326:
enriched = enriched.to_crs("EPSG:4326")

print(f"Enriched {len(enriched)} tehsil polygons with crop data")
return enriched


def export_to_geojson(enriched_gdf, output_path):
"""Export enriched GeoDataFrame to GeoJSON."""
enriched_gdf.to_file(output_path, driver="GeoJSON")
print(f"Exported GeoJSON to {output_path}")


@app.task(bind=True)
def publish_agri_census_to_gee(
self,
matched_csv_path,
boundary_geojson_path,
state,
district,
block,
gee_account_id,
):
"""Celery task to publish agriculture census as a GEE vector asset.

Workflow:
1. Enrich tehsil boundaries with crop data
2. Convert to ee.FeatureCollection
3. Export to GEE as vector asset
4. Sync to GeoServer
5. Save layer info to DB
"""
ee_initialize(gee_account_id)

description = (
f"agri_census_{valid_gee_text(district)}_{valid_gee_text(block)}"
)
asset_id = get_gee_asset_path(state, district, block) + description

if is_gee_asset_exists(asset_id):
print(f"Asset already exists: {asset_id}")
return

# Step 1: Enrich boundaries
print("Enriching tehsil boundaries with crop data...")
enriched_gdf = enrich_tehsil_boundaries(
matched_csv_path, boundary_geojson_path
)

if enriched_gdf.empty:
print("No matched tehsils found. Skipping.")
return

# Step 2: Convert to FeatureCollection
print("Converting to Earth Engine FeatureCollection...")
fc = gdf_to_ee_fc(enriched_gdf)

# Step 3: Export to GEE
print(f"Exporting to GEE asset: {asset_id}")
task_id = export_vector_asset_to_gee(fc, description, asset_id)

if task_id:
check_task_status(task_id)
make_asset_public(asset_id)
print(f"Published agriculture census asset: {asset_id}")

# Step 4: Sync to GeoServer
layer_name = (
valid_gee_text(district) + "_" + valid_gee_text(block) + "_agri_census"
)
sync_fc_to_geoserver(asset_id, layer_name)

# Step 5: Save to DB
save_layer_info_to_db(
state=state,
district=district,
block=block,
layer_name=layer_name,
dataset_name="Agriculture Census",
metadata={
"source": "agcensus.da.gov.in",
"description": "Tehsil-level crop type and area data",
},
)
update_layer_sync_status(layer_name, status="synced")
print("Done.")
156 changes: 156 additions & 0 deletions computing/misc/agriculture_census/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
"""
Agriculture Census Pipeline

End-to-end pipeline to:
1. Scrape crop data from the Agriculture Census website
2. Clean and structure the data
3. Match tehsil names to CoRE Stack SOI boundaries
4. Export matched data as CSV for GEE integration

Usage:
python -m computing.misc.agriculture_census.pipeline \
--boundary-file /path/to/soi_tehsil.geojson \
--output-dir /path/to/output \
--states "Madhya Pradesh" "Rajasthan"
"""

import os
import argparse
import json
import pandas as pd
import geopandas as gpd

from .scraper import scrape_agcensus
from .tehsil_matcher import match_tehsils


def run_pipeline(
boundary_file,
output_dir,
states=None,
max_districts=None,
headless=True,
skip_scraping=False,
scraped_csv=None,
):
"""Run the full agriculture census pipeline.

Args:
boundary_file: Path to SOI tehsil boundary GeoJSON
output_dir: Path to write output files
states: List of state names to process
max_districts: Limit districts per state (for testing)
headless: Run browser headless
skip_scraping: If True, load from scraped_csv instead
scraped_csv: Path to previously scraped data CSV

Returns:
dict with matched_df and stats
"""
os.makedirs(output_dir, exist_ok=True)

# Step 1: Scrape or load data
print("=" * 60)
print("Step 1: Getting agriculture census data...")
print("=" * 60)

if skip_scraping and scraped_csv:
print(f"Loading previously scraped data from {scraped_csv}")
census_df = pd.read_csv(scraped_csv)
else:
census_df = scrape_agcensus(
output_dir=output_dir,
states=states,
max_districts=max_districts,
headless=headless,
)

if census_df.empty:
print("No data to process. Exiting.")
return {"matched_df": census_df, "stats": {}}

print(f" Records: {len(census_df)}")

# Step 2: Load SOI boundaries
print("\n" + "=" * 60)
print("Step 2: Loading SOI tehsil boundaries...")
print("=" * 60)

boundary_gdf = gpd.read_file(boundary_file)
boundary_df = pd.DataFrame(boundary_gdf.drop(columns="geometry"))
print(f" Boundary records: {len(boundary_df)}")
print(f" Columns: {list(boundary_df.columns)}")

# Step 3: Match tehsils
print("\n" + "=" * 60)
print("Step 3: Matching tehsil names...")
print("=" * 60)

matched_df, stats = match_tehsils(census_df, boundary_df)

# Save outputs
matched_path = os.path.join(output_dir, "agriculture_census_matched.csv")
matched_df.to_csv(matched_path, index=False)

stats_path = os.path.join(output_dir, "agri_census_match_stats.json")
with open(stats_path, "w") as f:
json.dump(stats, f, indent=2)

print(f"\n Match statistics:")
print(f" Total: {stats['total']}")
print(f" Exact: {stats['exact']}")
print(f" Fuzzy: {stats['fuzzy']}")
print(f" Unmatched: {stats['unmatched']}")
print(f" Match %: {stats['match_pct']}%")
print(f"\n Saved to {matched_path}")

return {"matched_df": matched_df, "stats": stats}


def main():
parser = argparse.ArgumentParser(
description="Scrape and process Agriculture Census data"
)
parser.add_argument(
"--boundary-file", required=True,
help="Path to SOI tehsil boundary GeoJSON file"
)
parser.add_argument(
"--output-dir", required=True,
help="Directory to write output files"
)
parser.add_argument(
"--states", nargs="*", default=None,
help="States to process (space-separated)"
)
parser.add_argument(
"--max-districts", type=int, default=None,
help="Max districts per state (for testing)"
)
parser.add_argument(
"--no-headless", action="store_true",
help="Run browser with visible window"
)
parser.add_argument(
"--skip-scraping", action="store_true",
help="Skip scraping, load from --scraped-csv instead"
)
parser.add_argument(
"--scraped-csv", default=None,
help="Path to previously scraped CSV"
)

args = parser.parse_args()
run_pipeline(
boundary_file=args.boundary_file,
output_dir=args.output_dir,
states=args.states,
max_districts=args.max_districts,
headless=not args.no_headless,
skip_scraping=args.skip_scraping,
scraped_csv=args.scraped_csv,
)


if __name__ == "__main__":
main()
Loading