diff --git a/computing/misc/livestock_census/__init__.py b/computing/misc/livestock_census/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/computing/misc/livestock_census/data_loader.py b/computing/misc/livestock_census/data_loader.py new file mode 100644 index 00000000..4336c1ea --- /dev/null +++ b/computing/misc/livestock_census/data_loader.py @@ -0,0 +1,164 @@ +""" +Livestock Census Data Loader + +Downloads and parses the 20th Livestock Census (DAHD) village and ward level +data. The source xlsx contains four sheets: + - Rural Male Population + - Rural Female Population + - Urban Male Population + - Urban Female Population + +Each sheet has columns: state_name, district_name, block_name/town_name, +village_name/ward_name, cattle, buffalo, sheep, goat, pig, ... + +This module consolidates all four sheets into a single DataFrame with +standardized columns and an area_type (rural/urban) indicator. +""" + +import os +import pandas as pd +import requests +from io import BytesIO + +DAHD_URL = ( + "https://www.dahd.gov.in/sites/default/files/2023-07/" + "VillageAndWardLevelDataMale-Female.xlsx" +) + +LIVESTOCK_TYPES = ["cattle", "buffalo", "sheep", "goat", "pig"] + +# Sheet names in the xlsx file +SHEET_CONFIG = { + 0: {"area_type": "rural", "sex": "male"}, + 1: {"area_type": "rural", "sex": "female"}, + 2: {"area_type": "urban", "sex": "male"}, + 3: {"area_type": "urban", "sex": "female"}, +} + + +def download_livestock_data(url=DAHD_URL, cache_dir=None): + """Download the DAHD livestock census xlsx file. + + Args: + url: URL of the xlsx file + cache_dir: If provided, cache the file locally + + Returns: + BytesIO object containing the xlsx data + """ + if cache_dir: + os.makedirs(cache_dir, exist_ok=True) + cache_path = os.path.join(cache_dir, "livestock_census_20th.xlsx") + if os.path.exists(cache_path): + print(f"Using cached file: {cache_path}") + with open(cache_path, "rb") as f: + return BytesIO(f.read()) + + print(f"Downloading livestock census data from {url}...") + resp = requests.get(url, timeout=120) + resp.raise_for_status() + data = BytesIO(resp.content) + + if cache_dir: + with open(cache_path, "wb") as f: + f.write(resp.content) + print(f"Cached to {cache_path}") + + return data + + +def _normalize_columns(df, area_type): + """Standardize column names across rural and urban sheets.""" + df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns] + + rename_map = {} + for col in df.columns: + if "state" in col: + rename_map[col] = "state_name" + elif "district" in col: + rename_map[col] = "district_name" + elif "block" in col: + rename_map[col] = "block_name" + elif "town" in col: + rename_map[col] = "block_name" + elif "village" in col: + rename_map[col] = "village_name" + elif "ward" in col: + rename_map[col] = "village_name" + + df = df.rename(columns=rename_map) + df["area_type"] = area_type + return df + + +def _clean_text(series): + """Normalize text: strip, lowercase, collapse whitespace.""" + return ( + series.astype(str) + .str.strip() + .str.lower() + .str.replace(r"\s+", " ", regex=True) + ) + + +def load_livestock_data(url=DAHD_URL, cache_dir=None): + """Load and consolidate all four sheets of the livestock census. + + Returns: + pd.DataFrame with columns: + state_name, district_name, block_name, village_name, + area_type, sex, cattle, buffalo, sheep, goat, pig + """ + data = download_livestock_data(url, cache_dir) + + frames = [] + for sheet_idx, config in SHEET_CONFIG.items(): + print(f"Parsing sheet {sheet_idx}: {config['area_type']} {config['sex']}...") + df = pd.read_excel(data, sheet_name=sheet_idx) + df = _normalize_columns(df, config["area_type"]) + df["sex"] = config["sex"] + + # Ensure livestock columns are numeric + for col in LIVESTOCK_TYPES: + if col in df.columns: + df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0).astype(int) + + frames.append(df) + data.seek(0) # Reset stream for next sheet + + combined = pd.concat(frames, ignore_index=True) + + # Clean text fields + for col in ["state_name", "district_name", "block_name", "village_name"]: + if col in combined.columns: + combined[col] = _clean_text(combined[col]) + + # Keep only relevant columns + keep_cols = [ + "state_name", "district_name", "block_name", "village_name", + "area_type", "sex", + ] + [c for c in LIVESTOCK_TYPES if c in combined.columns] + + combined = combined[[c for c in keep_cols if c in combined.columns]] + + print(f"Loaded {len(combined)} records across {combined['state_name'].nunique()} states") + return combined + + +def aggregate_livestock_data(df): + """Aggregate male + female counts per village to get total livestock. + + Args: + df: DataFrame from load_livestock_data() + + Returns: + pd.DataFrame with total counts per village (male + female combined) + """ + group_cols = ["state_name", "district_name", "block_name", "village_name", "area_type"] + livestock_cols = [c for c in LIVESTOCK_TYPES if c in df.columns] + + agg = df.groupby(group_cols, as_index=False)[livestock_cols].sum() + agg["total_livestock"] = agg[livestock_cols].sum(axis=1) + + print(f"Aggregated to {len(agg)} unique village/ward entries") + return agg diff --git a/computing/misc/livestock_census/gee_export.py b/computing/misc/livestock_census/gee_export.py new file mode 100644 index 00000000..73dd4699 --- /dev/null +++ b/computing/misc/livestock_census/gee_export.py @@ -0,0 +1,206 @@ +""" +GEE Export for Livestock Census + +Takes the matched livestock census data, joins it with village boundary +geometries, and publishes the enriched vector layer as an Earth Engine asset. + +The output FeatureCollection has per-village polygons with properties: + - cattle, buffalo, sheep, goat, pig, total_livestock + - census2011_id, lgd_village_id + - state_name, district_name, block_name, village_name + +This enables filtering/aggregation by livestock type in GEE and +downstream apps like Know Your Landscape. +""" + +import ee +import geopandas as gpd +import pandas as pd + +from utilities.gee_utils import ( + ee_initialize, + gdf_to_ee_fc, + export_vector_asset_to_gee, + check_task_status, + is_gee_asset_exists, + make_asset_public, + valid_gee_text, + get_gee_asset_path, +) +from computing.utils import ( + sync_fc_to_geoserver, + save_layer_info_to_db, + update_layer_sync_status, +) +from nrm_app.celery import app + +LIVESTOCK_TYPES = ["cattle", "buffalo", "sheep", "goat", "pig"] + + +def enrich_village_boundaries(matched_csv_path, boundary_shapefile_path): + """Join matched livestock data with village boundary geometries. + + Args: + matched_csv_path: Path to the matched livestock CSV + (output of pipeline.py with census2011_id column) + boundary_shapefile_path: Path to village boundary shapefile/geojson + + Returns: + GeoDataFrame with village polygons enriched with livestock attributes + """ + livestock_df = pd.read_csv(matched_csv_path) + boundaries_gdf = gpd.read_file(boundary_shapefile_path) + + # Standardize boundary column names + boundaries_gdf.columns = [ + c.strip().lower().replace(" ", "_") for c in boundaries_gdf.columns + ] + + # Find the census ID column in boundary data + census_col = None + for col in boundaries_gdf.columns: + if "census2011" in col or "census_2011" in col: + census_col = col + break + + if census_col is None: + raise ValueError( + "Could not find census2011 ID column in boundary shapefile. " + f"Available columns: {list(boundaries_gdf.columns)}" + ) + + # Filter to only matched records + matched = livestock_df[livestock_df["match_type"].isin(["exact", "fuzzy"])].copy() + matched["census2011_id"] = matched["census2011_id"].astype(str) + boundaries_gdf[census_col] = boundaries_gdf[census_col].astype(str) + + # Aggregate livestock counts per census ID (in case of duplicates) + agg_cols = {col: "sum" for col in LIVESTOCK_TYPES if col in matched.columns} + agg_cols["total_livestock"] = "sum" + agg_cols["state_name"] = "first" + agg_cols["district_name"] = "first" + agg_cols["block_name"] = "first" + agg_cols["village_name"] = "first" + + livestock_agg = matched.groupby("census2011_id", as_index=False).agg(agg_cols) + + # Join with boundaries + enriched = boundaries_gdf.merge( + livestock_agg, + left_on=census_col, + right_on="census2011_id", + how="inner", + ) + + # Ensure CRS is EPSG:4326 + if enriched.crs is None: + enriched = enriched.set_crs("EPSG:4326") + elif enriched.crs.to_epsg() != 4326: + enriched = enriched.to_crs("EPSG:4326") + + # Keep only relevant columns + geometry + keep_cols = [ + "geometry", "census2011_id", "state_name", "district_name", + "block_name", "village_name", + ] + [c for c in LIVESTOCK_TYPES if c in enriched.columns] + ["total_livestock"] + + enriched = enriched[[c for c in keep_cols if c in enriched.columns]] + + print(f"Enriched {len(enriched)} village polygons with livestock data") + return enriched + + +def export_to_geojson(enriched_gdf, output_path): + """Export enriched GeoDataFrame to GeoJSON for local use. + + Args: + enriched_gdf: GeoDataFrame from enrich_village_boundaries() + output_path: Path to write GeoJSON file + """ + enriched_gdf.to_file(output_path, driver="GeoJSON") + print(f"Exported GeoJSON to {output_path}") + + +@app.task(bind=True) +def publish_livestock_census_to_gee( + self, + matched_csv_path, + boundary_shapefile_path, + state, + district, + block, + gee_account_id, +): + """Celery task to publish livestock census as a GEE vector asset. + + Workflow: + 1. Enrich village boundaries with livestock data + 2. Convert to ee.FeatureCollection + 3. Export to GEE as a vector asset + 4. Sync to GeoServer + 5. Save layer info to DB + + Args: + matched_csv_path: Path to matched livestock CSV + boundary_shapefile_path: Path to village boundary shapefile + state, district, block: Location identifiers + gee_account_id: GEE service account ID + """ + ee_initialize(gee_account_id) + + description = ( + f"livestock_census_{valid_gee_text(district)}_{valid_gee_text(block)}" + ) + asset_id = ( + get_gee_asset_path(state, district, block) + description + ) + + if is_gee_asset_exists(asset_id): + print(f"Asset already exists: {asset_id}") + return + + # Step 1: Enrich boundaries + print("Enriching village boundaries with livestock data...") + enriched_gdf = enrich_village_boundaries( + matched_csv_path, boundary_shapefile_path + ) + + if enriched_gdf.empty: + print("No matched villages found for this location. Skipping.") + return + + # Step 2: Convert to ee.FeatureCollection + print("Converting to Earth Engine FeatureCollection...") + fc = gdf_to_ee_fc(enriched_gdf) + + # Step 3: Export to GEE + print(f"Exporting to GEE asset: {asset_id}") + task_id = export_vector_asset_to_gee(fc, description, asset_id) + + if task_id: + check_task_status(task_id) + make_asset_public(asset_id) + print(f"Published livestock census asset: {asset_id}") + + # Step 4: Sync to GeoServer + layer_name = ( + valid_gee_text(district) + "_" + valid_gee_text(block) + "_livestock_census" + ) + sync_fc_to_geoserver(asset_id, layer_name) + + # Step 5: Save to DB + save_layer_info_to_db( + state=state, + district=district, + block=block, + layer_name=layer_name, + dataset_name="Livestock Census", + metadata={ + "source": "DAHD 20th Livestock Census", + "livestock_types": LIVESTOCK_TYPES, + "year": 2020, + "resolution": "village_level", + }, + ) + update_layer_sync_status(layer_name, status="synced") + print("Done.") diff --git a/computing/misc/livestock_census/pipeline.py b/computing/misc/livestock_census/pipeline.py new file mode 100644 index 00000000..b11b6bfe --- /dev/null +++ b/computing/misc/livestock_census/pipeline.py @@ -0,0 +1,180 @@ +""" +Livestock Census Pipeline + +End-to-end pipeline to: +1. Download and parse the 20th Livestock Census (DAHD) data +2. Aggregate male + female counts per village +3. Fuzzy match village names to CoRE Stack village boundaries +4. Export matched data as CSV for further GEE integration +5. Report match statistics per state + +Usage: + python -m computing.misc.livestock_census.pipeline \ + --boundaries-dir /path/to/village_boundaries \ + --output-dir /path/to/output \ + --cache-dir /path/to/cache + +Or import and use programmatically: + from computing.misc.livestock_census.pipeline import run_pipeline + results = run_pipeline(boundaries_dir="...", output_dir="...") +""" + +import os +import argparse +import json +import pandas as pd + +from .data_loader import load_livestock_data, aggregate_livestock_data +from .village_matcher import ( + load_village_boundaries, + match_villages, + match_stats_by_state, +) + + +def run_pipeline( + boundaries_dir, + output_dir, + cache_dir=None, + data_url=None, + states=None, + similarity_threshold=0.80, +): + """Run the full livestock census data processing pipeline. + + Args: + boundaries_dir: Path to directory with state-wise village boundary files + output_dir: Path to write output CSV and stats + cache_dir: Optional path to cache downloaded xlsx + data_url: Override the default DAHD data URL + states: List of state names to process (None = all) + similarity_threshold: Minimum score for fuzzy matching + + Returns: + dict with keys: matched_df, stats_by_state, overall_stats + """ + os.makedirs(output_dir, exist_ok=True) + + # Step 1: Load and parse livestock census data + print("=" * 60) + print("Step 1: Loading livestock census data...") + print("=" * 60) + kwargs = {} + if data_url: + kwargs["url"] = data_url + if cache_dir: + kwargs["cache_dir"] = cache_dir + + raw_df = load_livestock_data(**kwargs) + print(f" Raw records: {len(raw_df)}") + print(f" States: {raw_df['state_name'].nunique()}") + print(f" Columns: {list(raw_df.columns)}") + + # Step 2: Aggregate male + female counts + print("\n" + "=" * 60) + print("Step 2: Aggregating livestock counts (male + female)...") + print("=" * 60) + agg_df = aggregate_livestock_data(raw_df) + + if states: + states_lower = [s.lower().strip() for s in states] + agg_df = agg_df[agg_df["state_name"].isin(states_lower)] + print(f" Filtered to {len(states)} states: {states}") + + # Save aggregated data + agg_path = os.path.join(output_dir, "livestock_census_aggregated.csv") + agg_df.to_csv(agg_path, index=False) + print(f" Saved aggregated data to {agg_path}") + + # Step 3: Load village boundaries + print("\n" + "=" * 60) + print("Step 3: Loading village boundaries...") + print("=" * 60) + boundaries_df = load_village_boundaries(boundaries_dir) + print(f" Boundary records: {len(boundaries_df)}") + + # Step 4: Match villages + print("\n" + "=" * 60) + print("Step 4: Matching villages (this may take a while)...") + print("=" * 60) + matched_df, overall_stats = match_villages( + agg_df, boundaries_df, similarity_threshold=similarity_threshold + ) + + # Save matched data + matched_path = os.path.join(output_dir, "livestock_census_matched.csv") + matched_df.to_csv(matched_path, index=False) + print(f"\n Saved matched data to {matched_path}") + + # Step 5: Compute and save statistics + print("\n" + "=" * 60) + print("Step 5: Computing match statistics...") + print("=" * 60) + state_stats = match_stats_by_state(matched_df) + + stats_path = os.path.join(output_dir, "match_stats_by_state.csv") + state_stats.to_csv(stats_path, index=False) + + overall_path = os.path.join(output_dir, "match_stats_overall.json") + with open(overall_path, "w") as f: + json.dump(overall_stats, f, indent=2) + + print(f"\n Overall match statistics:") + print(f" Total records: {overall_stats['total_records']}") + print(f" Exact matches: {overall_stats['exact_matches']} ({overall_stats['exact_match_pct']}%)") + print(f" Fuzzy matches: {overall_stats['fuzzy_matches']} ({overall_stats['fuzzy_match_pct']}%)") + print(f" Unmatched: {overall_stats['unmatched']}") + print(f" Total match %: {overall_stats['total_match_pct']}%") + + print(f"\n Per-state stats saved to {stats_path}") + print(f" Overall stats saved to {overall_path}") + + return { + "matched_df": matched_df, + "stats_by_state": state_stats, + "overall_stats": overall_stats, + } + + +def main(): + parser = argparse.ArgumentParser( + description="Process 20th Livestock Census data and match to village boundaries" + ) + parser.add_argument( + "--boundaries-dir", required=True, + help="Directory containing state-wise village boundary shapefiles" + ) + parser.add_argument( + "--output-dir", required=True, + help="Directory to write output files" + ) + parser.add_argument( + "--cache-dir", default=None, + help="Directory to cache downloaded data" + ) + parser.add_argument( + "--data-url", default=None, + help="Override default DAHD data URL" + ) + parser.add_argument( + "--states", nargs="*", default=None, + help="Process only these states (space-separated)" + ) + parser.add_argument( + "--threshold", type=float, default=0.80, + help="Fuzzy match similarity threshold (default: 0.80)" + ) + + args = parser.parse_args() + run_pipeline( + boundaries_dir=args.boundaries_dir, + output_dir=args.output_dir, + cache_dir=args.cache_dir, + data_url=args.data_url, + states=args.states, + similarity_threshold=args.threshold, + ) + + +if __name__ == "__main__": + main() diff --git a/computing/misc/livestock_census/village_matcher.py b/computing/misc/livestock_census/village_matcher.py new file mode 100644 index 00000000..8349d6a1 --- /dev/null +++ b/computing/misc/livestock_census/village_matcher.py @@ -0,0 +1,285 @@ +""" +Village Name Matcher + +Performs fuzzy matching between livestock census village names and +CoRE Stack village boundary records. Matching is done hierarchically: + + 1. Exact match on state-district-block-village key + 2. Fuzzy match using edit distance (Levenshtein) on village name + within the same state-district-block + +The output maps each livestock census record to a census2011 village ID +(or lgd_village key) from the CoRE Stack boundary dataset. +""" + +import pandas as pd +import geopandas as gpd +from difflib import SequenceMatcher +from unidecode import unidecode +import os +import json + + +def _normalize(text): + """Normalize village/district name for matching.""" + if not isinstance(text, str): + return "" + text = unidecode(text).strip().lower() + # Remove common suffixes/prefixes that vary across datasets + for suffix in [" (ct)", " (cb)", " (og)", " (m)", " (tp)", " (np)", " (m cl)"]: + text = text.replace(suffix, "") + # Collapse whitespace + text = " ".join(text.split()) + return text + + +def _similarity(a, b): + """Compute similarity ratio between two strings.""" + return SequenceMatcher(None, a, b).ratio() + + +def load_village_boundaries(shapefile_dir, state_name=None): + """Load village boundary shapefiles from the CoRE Stack dataset. + + The shapefiles are organized by state. Each has columns including + village name, census2011 id, lgd_village key, district, subdistrict. + + Args: + shapefile_dir: Path to directory containing state-wise shapefiles + state_name: If provided, load only this state's boundaries + + Returns: + pd.DataFrame with boundary village records (without geometry to save memory) + """ + frames = [] + + if state_name: + # Try to find matching file + for f in os.listdir(shapefile_dir): + if f.endswith(".geojson") or f.endswith(".shp"): + if _normalize(state_name) in _normalize(f): + path = os.path.join(shapefile_dir, f) + gdf = gpd.read_file(path) + # Drop geometry to save memory for matching + df = pd.DataFrame(gdf.drop(columns="geometry")) + frames.append(df) + else: + for f in sorted(os.listdir(shapefile_dir)): + if f.endswith(".geojson") or f.endswith(".shp"): + path = os.path.join(shapefile_dir, f) + try: + gdf = gpd.read_file(path) + df = pd.DataFrame(gdf.drop(columns="geometry")) + frames.append(df) + print(f" Loaded {f}: {len(df)} villages") + except Exception as e: + print(f" Error loading {f}: {e}") + + if not frames: + raise FileNotFoundError(f"No boundary files found in {shapefile_dir}") + + boundaries = pd.concat(frames, ignore_index=True) + + # Normalize column names + boundaries.columns = [c.strip().lower().replace(" ", "_") for c in boundaries.columns] + + return boundaries + + +def _detect_boundary_columns(boundaries_df): + """Auto-detect which columns in the boundary dataset correspond to + state, district, subdistrict, village name and village ID.""" + cols = boundaries_df.columns.tolist() + mapping = {} + + for col in cols: + cl = col.lower() + if "state" in cl and "name" in cl: + mapping["state"] = col + elif "district" in cl and "name" in cl: + mapping["district"] = col + elif "subdist" in cl and "name" in cl: + mapping["subdistrict"] = col + elif "village" in cl and "name" in cl: + mapping["village"] = col + elif "census2011" in cl or "census_2011" in cl: + mapping["census_id"] = col + elif "lgd_village" in cl or "lgd" in cl: + mapping["lgd_id"] = col + + # Fallback heuristics + if "state" not in mapping: + for col in cols: + if col.lower() in ["state", "state_ut"]: + mapping["state"] = col + break + if "village" not in mapping: + for col in cols: + if "village" in col.lower() and "code" not in col.lower(): + mapping["village"] = col + break + if "census_id" not in mapping: + for col in cols: + if "census" in col.lower() and "code" in col.lower(): + mapping["census_id"] = col + break + + return mapping + + +def match_villages(livestock_df, boundaries_df, similarity_threshold=0.80): + """Match livestock census villages to boundary dataset villages. + + Strategy: + 1. Build a lookup of boundary villages keyed by + normalized (state, district, subdistrict/block) + 2. For each livestock record, first try exact match on village name + 3. If no exact match, try fuzzy match above the threshold + + Args: + livestock_df: Aggregated livestock census DataFrame + boundaries_df: Village boundaries DataFrame + similarity_threshold: Minimum similarity score for fuzzy match + + Returns: + tuple: (matched_df, match_stats) + matched_df has additional columns: matched_village_name, + census2011_id, lgd_village_id, match_type, match_score + """ + col_map = _detect_boundary_columns(boundaries_df) + print(f"Detected boundary columns: {json.dumps(col_map, indent=2)}") + + if "village" not in col_map: + raise ValueError("Could not detect village name column in boundary dataset") + + # Build lookup: (state, district, subdistrict) -> list of village records + boundary_lookup = {} + for _, row in boundaries_df.iterrows(): + state = _normalize(str(row.get(col_map.get("state", ""), ""))) + district = _normalize(str(row.get(col_map.get("district", ""), ""))) + subdistrict = _normalize(str(row.get(col_map.get("subdistrict", ""), ""))) + village = _normalize(str(row.get(col_map["village"], ""))) + + key = (state, district, subdistrict) + census_id = row.get(col_map.get("census_id", ""), "") + lgd_id = row.get(col_map.get("lgd_id", ""), "") + + if key not in boundary_lookup: + boundary_lookup[key] = [] + boundary_lookup[key].append({ + "village": village, + "original_name": str(row.get(col_map["village"], "")), + "census_id": census_id, + "lgd_id": lgd_id, + }) + + # Match each livestock record + results = [] + match_counts = {"exact": 0, "fuzzy": 0, "unmatched": 0} + + total = len(livestock_df) + for idx, row in livestock_df.iterrows(): + if idx % 10000 == 0 and idx > 0: + print(f" Processed {idx}/{total} records...") + + state = _normalize(str(row.get("state_name", ""))) + district = _normalize(str(row.get("district_name", ""))) + block = _normalize(str(row.get("block_name", ""))) + village = _normalize(str(row.get("village_name", ""))) + + # Try lookup with (state, district, block) + key = (state, district, block) + candidates = boundary_lookup.get(key, []) + + # Also try without block (sometimes block/subdistrict names differ) + if not candidates: + for k, v in boundary_lookup.items(): + if k[0] == state and k[1] == district: + candidates.extend(v) + + matched_name = "" + census_id = "" + lgd_id = "" + match_type = "unmatched" + match_score = 0.0 + + if candidates: + # Exact match first + for c in candidates: + if c["village"] == village: + matched_name = c["original_name"] + census_id = c["census_id"] + lgd_id = c["lgd_id"] + match_type = "exact" + match_score = 1.0 + break + + # Fuzzy match if no exact + if match_type == "unmatched" and village: + best_score = 0 + best_candidate = None + for c in candidates: + score = _similarity(village, c["village"]) + if score > best_score: + best_score = score + best_candidate = c + + if best_score >= similarity_threshold and best_candidate: + matched_name = best_candidate["original_name"] + census_id = best_candidate["census_id"] + lgd_id = best_candidate["lgd_id"] + match_type = "fuzzy" + match_score = best_score + + match_counts[match_type] = match_counts.get(match_type, 0) + 1 + + results.append({ + "matched_village_name": matched_name, + "census2011_id": census_id, + "lgd_village_id": lgd_id, + "match_type": match_type, + "match_score": round(match_score, 3), + }) + + result_df = pd.DataFrame(results) + matched_df = pd.concat([livestock_df.reset_index(drop=True), result_df], axis=1) + + # Compute stats + match_stats = { + "total_records": total, + "exact_matches": match_counts["exact"], + "fuzzy_matches": match_counts["fuzzy"], + "unmatched": match_counts["unmatched"], + "exact_match_pct": round(100 * match_counts["exact"] / max(total, 1), 2), + "fuzzy_match_pct": round(100 * match_counts["fuzzy"] / max(total, 1), 2), + "total_match_pct": round( + 100 * (match_counts["exact"] + match_counts["fuzzy"]) / max(total, 1), 2 + ), + } + + return matched_df, match_stats + + +def match_stats_by_state(matched_df): + """Compute match statistics per state. + + Returns: + pd.DataFrame with columns: state, total, exact, fuzzy, unmatched, + match_pct + """ + stats = [] + for state, group in matched_df.groupby("state_name"): + total = len(group) + exact = (group["match_type"] == "exact").sum() + fuzzy = (group["match_type"] == "fuzzy").sum() + unmatched = (group["match_type"] == "unmatched").sum() + stats.append({ + "state": state, + "total": total, + "exact": exact, + "fuzzy": fuzzy, + "unmatched": unmatched, + "match_pct": round(100 * (exact + fuzzy) / max(total, 1), 2), + }) + + return pd.DataFrame(stats).sort_values("match_pct", ascending=False)