Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
164 changes: 164 additions & 0 deletions computing/misc/livestock_census/data_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
"""
Livestock Census Data Loader

Downloads and parses the 20th Livestock Census (DAHD) village and ward level
data. The source xlsx contains four sheets:
- Rural Male Population
- Rural Female Population
- Urban Male Population
- Urban Female Population

Each sheet has columns: state_name, district_name, block_name/town_name,
village_name/ward_name, cattle, buffalo, sheep, goat, pig, ...

This module consolidates all four sheets into a single DataFrame with
standardized columns and an area_type (rural/urban) indicator.
"""

import os
import pandas as pd
import requests
from io import BytesIO

DAHD_URL = (
"https://www.dahd.gov.in/sites/default/files/2023-07/"
"VillageAndWardLevelDataMale-Female.xlsx"
)

LIVESTOCK_TYPES = ["cattle", "buffalo", "sheep", "goat", "pig"]

# Sheet names in the xlsx file
SHEET_CONFIG = {
0: {"area_type": "rural", "sex": "male"},
1: {"area_type": "rural", "sex": "female"},
2: {"area_type": "urban", "sex": "male"},
3: {"area_type": "urban", "sex": "female"},
}


def download_livestock_data(url=DAHD_URL, cache_dir=None):
"""Download the DAHD livestock census xlsx file.

Args:
url: URL of the xlsx file
cache_dir: If provided, cache the file locally

Returns:
BytesIO object containing the xlsx data
"""
if cache_dir:
os.makedirs(cache_dir, exist_ok=True)
cache_path = os.path.join(cache_dir, "livestock_census_20th.xlsx")
if os.path.exists(cache_path):
print(f"Using cached file: {cache_path}")
with open(cache_path, "rb") as f:
return BytesIO(f.read())

print(f"Downloading livestock census data from {url}...")
resp = requests.get(url, timeout=120)
resp.raise_for_status()
data = BytesIO(resp.content)

if cache_dir:
with open(cache_path, "wb") as f:
f.write(resp.content)
print(f"Cached to {cache_path}")

return data


def _normalize_columns(df, area_type):
"""Standardize column names across rural and urban sheets."""
df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns]

rename_map = {}
for col in df.columns:
if "state" in col:
rename_map[col] = "state_name"
elif "district" in col:
rename_map[col] = "district_name"
elif "block" in col:
rename_map[col] = "block_name"
elif "town" in col:
rename_map[col] = "block_name"
elif "village" in col:
rename_map[col] = "village_name"
elif "ward" in col:
rename_map[col] = "village_name"

df = df.rename(columns=rename_map)
df["area_type"] = area_type
return df


def _clean_text(series):
"""Normalize text: strip, lowercase, collapse whitespace."""
return (
series.astype(str)
.str.strip()
.str.lower()
.str.replace(r"\s+", " ", regex=True)
)


def load_livestock_data(url=DAHD_URL, cache_dir=None):
"""Load and consolidate all four sheets of the livestock census.

Returns:
pd.DataFrame with columns:
state_name, district_name, block_name, village_name,
area_type, sex, cattle, buffalo, sheep, goat, pig
"""
data = download_livestock_data(url, cache_dir)

frames = []
for sheet_idx, config in SHEET_CONFIG.items():
print(f"Parsing sheet {sheet_idx}: {config['area_type']} {config['sex']}...")
df = pd.read_excel(data, sheet_name=sheet_idx)
df = _normalize_columns(df, config["area_type"])
df["sex"] = config["sex"]

# Ensure livestock columns are numeric
for col in LIVESTOCK_TYPES:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0).astype(int)

frames.append(df)
data.seek(0) # Reset stream for next sheet

combined = pd.concat(frames, ignore_index=True)

# Clean text fields
for col in ["state_name", "district_name", "block_name", "village_name"]:
if col in combined.columns:
combined[col] = _clean_text(combined[col])

# Keep only relevant columns
keep_cols = [
"state_name", "district_name", "block_name", "village_name",
"area_type", "sex",
] + [c for c in LIVESTOCK_TYPES if c in combined.columns]

combined = combined[[c for c in keep_cols if c in combined.columns]]

print(f"Loaded {len(combined)} records across {combined['state_name'].nunique()} states")
return combined


def aggregate_livestock_data(df):
"""Aggregate male + female counts per village to get total livestock.

Args:
df: DataFrame from load_livestock_data()

Returns:
pd.DataFrame with total counts per village (male + female combined)
"""
group_cols = ["state_name", "district_name", "block_name", "village_name", "area_type"]
livestock_cols = [c for c in LIVESTOCK_TYPES if c in df.columns]

agg = df.groupby(group_cols, as_index=False)[livestock_cols].sum()
agg["total_livestock"] = agg[livestock_cols].sum(axis=1)

print(f"Aggregated to {len(agg)} unique village/ward entries")
return agg
206 changes: 206 additions & 0 deletions computing/misc/livestock_census/gee_export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
"""
GEE Export for Livestock Census

Takes the matched livestock census data, joins it with village boundary
geometries, and publishes the enriched vector layer as an Earth Engine asset.

The output FeatureCollection has per-village polygons with properties:
- cattle, buffalo, sheep, goat, pig, total_livestock
- census2011_id, lgd_village_id
- state_name, district_name, block_name, village_name

This enables filtering/aggregation by livestock type in GEE and
downstream apps like Know Your Landscape.
"""

import ee
import geopandas as gpd
import pandas as pd

from utilities.gee_utils import (
ee_initialize,
gdf_to_ee_fc,
export_vector_asset_to_gee,
check_task_status,
is_gee_asset_exists,
make_asset_public,
valid_gee_text,
get_gee_asset_path,
)
from computing.utils import (
sync_fc_to_geoserver,
save_layer_info_to_db,
update_layer_sync_status,
)
from nrm_app.celery import app

LIVESTOCK_TYPES = ["cattle", "buffalo", "sheep", "goat", "pig"]


def enrich_village_boundaries(matched_csv_path, boundary_shapefile_path):
"""Join matched livestock data with village boundary geometries.

Args:
matched_csv_path: Path to the matched livestock CSV
(output of pipeline.py with census2011_id column)
boundary_shapefile_path: Path to village boundary shapefile/geojson

Returns:
GeoDataFrame with village polygons enriched with livestock attributes
"""
livestock_df = pd.read_csv(matched_csv_path)
boundaries_gdf = gpd.read_file(boundary_shapefile_path)

# Standardize boundary column names
boundaries_gdf.columns = [
c.strip().lower().replace(" ", "_") for c in boundaries_gdf.columns
]

# Find the census ID column in boundary data
census_col = None
for col in boundaries_gdf.columns:
if "census2011" in col or "census_2011" in col:
census_col = col
break

if census_col is None:
raise ValueError(
"Could not find census2011 ID column in boundary shapefile. "
f"Available columns: {list(boundaries_gdf.columns)}"
)

# Filter to only matched records
matched = livestock_df[livestock_df["match_type"].isin(["exact", "fuzzy"])].copy()
matched["census2011_id"] = matched["census2011_id"].astype(str)
boundaries_gdf[census_col] = boundaries_gdf[census_col].astype(str)

# Aggregate livestock counts per census ID (in case of duplicates)
agg_cols = {col: "sum" for col in LIVESTOCK_TYPES if col in matched.columns}
agg_cols["total_livestock"] = "sum"
agg_cols["state_name"] = "first"
agg_cols["district_name"] = "first"
agg_cols["block_name"] = "first"
agg_cols["village_name"] = "first"

livestock_agg = matched.groupby("census2011_id", as_index=False).agg(agg_cols)

# Join with boundaries
enriched = boundaries_gdf.merge(
livestock_agg,
left_on=census_col,
right_on="census2011_id",
how="inner",
)

# Ensure CRS is EPSG:4326
if enriched.crs is None:
enriched = enriched.set_crs("EPSG:4326")
elif enriched.crs.to_epsg() != 4326:
enriched = enriched.to_crs("EPSG:4326")

# Keep only relevant columns + geometry
keep_cols = [
"geometry", "census2011_id", "state_name", "district_name",
"block_name", "village_name",
] + [c for c in LIVESTOCK_TYPES if c in enriched.columns] + ["total_livestock"]

enriched = enriched[[c for c in keep_cols if c in enriched.columns]]

print(f"Enriched {len(enriched)} village polygons with livestock data")
return enriched


def export_to_geojson(enriched_gdf, output_path):
"""Export enriched GeoDataFrame to GeoJSON for local use.

Args:
enriched_gdf: GeoDataFrame from enrich_village_boundaries()
output_path: Path to write GeoJSON file
"""
enriched_gdf.to_file(output_path, driver="GeoJSON")
print(f"Exported GeoJSON to {output_path}")


@app.task(bind=True)
def publish_livestock_census_to_gee(
self,
matched_csv_path,
boundary_shapefile_path,
state,
district,
block,
gee_account_id,
):
"""Celery task to publish livestock census as a GEE vector asset.

Workflow:
1. Enrich village boundaries with livestock data
2. Convert to ee.FeatureCollection
3. Export to GEE as a vector asset
4. Sync to GeoServer
5. Save layer info to DB

Args:
matched_csv_path: Path to matched livestock CSV
boundary_shapefile_path: Path to village boundary shapefile
state, district, block: Location identifiers
gee_account_id: GEE service account ID
"""
ee_initialize(gee_account_id)

description = (
f"livestock_census_{valid_gee_text(district)}_{valid_gee_text(block)}"
)
asset_id = (
get_gee_asset_path(state, district, block) + description
)

if is_gee_asset_exists(asset_id):
print(f"Asset already exists: {asset_id}")
return

# Step 1: Enrich boundaries
print("Enriching village boundaries with livestock data...")
enriched_gdf = enrich_village_boundaries(
matched_csv_path, boundary_shapefile_path
)

if enriched_gdf.empty:
print("No matched villages found for this location. Skipping.")
return

# Step 2: Convert to ee.FeatureCollection
print("Converting to Earth Engine FeatureCollection...")
fc = gdf_to_ee_fc(enriched_gdf)

# Step 3: Export to GEE
print(f"Exporting to GEE asset: {asset_id}")
task_id = export_vector_asset_to_gee(fc, description, asset_id)

if task_id:
check_task_status(task_id)
make_asset_public(asset_id)
print(f"Published livestock census asset: {asset_id}")

# Step 4: Sync to GeoServer
layer_name = (
valid_gee_text(district) + "_" + valid_gee_text(block) + "_livestock_census"
)
sync_fc_to_geoserver(asset_id, layer_name)

# Step 5: Save to DB
save_layer_info_to_db(
state=state,
district=district,
block=block,
layer_name=layer_name,
dataset_name="Livestock Census",
metadata={
"source": "DAHD 20th Livestock Census",
"livestock_types": LIVESTOCK_TYPES,
"year": 2020,
"resolution": "village_level",
},
)
update_layer_sync_status(layer_name, status="synced")
print("Done.")
Loading