From 0e0240b09e94f6a69c513e822cb9f40d697ffc86 Mon Sep 17 00:00:00 2001 From: amit-spatial Date: Wed, 1 Apr 2026 21:46:15 +0000 Subject: [PATCH 1/3] added general script to quickly download data.gov data --- public_api/api.py | 268 +++++++++++++++ public_api/urls.py | 5 + utilities/scripts/fetch_datagov.py | 525 +++++++++++++++++++++++++++++ 3 files changed, 798 insertions(+) create mode 100644 utilities/scripts/fetch_datagov.py diff --git a/public_api/api.py b/public_api/api.py index 795a8dfa..7052327d 100644 --- a/public_api/api.py +++ b/public_api/api.py @@ -2,6 +2,7 @@ from rest_framework.response import Response from rest_framework import status from django.http import JsonResponse +from django.db.models import Q from utilities.gee_utils import ( valid_gee_text, ) @@ -19,6 +20,10 @@ get_mws_geometries_data, get_village_geometries_data, ) +from computing.models import Layer, Dataset, LayerType +from geoadmin.models import StateSOI, DistrictSOI, TehsilSOI +from stats_generator.utils import get_url +from nrm_app.settings import GEOSERVER_URL from utilities.auth_check_decorator import api_security_check from drf_yasg.utils import swagger_auto_schema from .swagger_schemas import ( @@ -487,3 +492,266 @@ def get_village_geometries(request): {"error": f"Internal server error: {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) + + +############# Get Layer Manifest for GeoNode/QGIS ################## +@api_security_check(auth_type="API_key") +def get_layer_manifest(request): + """ + Return a GeoNode/QGIS-ready manifest of all layers. + + Query parameters: + - state: Filter by state (optional) + - district: Filter by district (optional) + - tehsil: Filter by tehsil/block (optional) + - all_active: Return all active locations (optional, boolean) + - format: Output format - 'json' or 'csv' (default: json) + """ + from datetime import datetime, timezone + from urllib.parse import parse_qs, urlparse + from django.http import HttpResponse + import csv + from io import StringIO + + def raster_tiff_download_url(workspace, layer_name): + return ( + f"{GEOSERVER_URL}/{workspace}/wcs?service=WCS&version=2.0.1" + f"&request=GetCoverage&CoverageId={workspace}:{layer_name}" + f"&format=geotiff&compression=LZW&tiling=true&tileheight=256&tilewidth=256" + ) + + def infer_service_details(layer_url): + if not layer_url: + return { + "service": "", "workspace": "", "resource_name": "", + "resource_identifier": "", "geoserver_root": "", "ows_url": "", "wms_url": "", + } + parsed = urlparse(layer_url) + query_params = parse_qs(parsed.query) + + path = parsed.path or "" + marker_index = path.find("/geoserver") + if marker_index == -1: + geoserver_root = f"{parsed.scheme}://{parsed.netloc}" + else: + prefix = path[: marker_index + len("/geoserver")] + geoserver_root = f"{parsed.scheme}://{parsed.netloc}{prefix}" + + service = query_params.get("service", [""])[0].upper() + workspace = "" + resource_name = "" + resource_identifier = "" + + if "typeName" in query_params and query_params["typeName"]: + resource_identifier = query_params["typeName"][0] + if ":" in resource_identifier: + workspace, resource_name = resource_identifier.split(":", 1) + elif "CoverageId" in query_params and query_params["CoverageId"]: + resource_identifier = query_params["CoverageId"][0] + if ":" in resource_identifier: + workspace, resource_name = resource_identifier.split(":", 1) + + ows_url = f"{geoserver_root}/{workspace}/ows" if workspace else "" + wms_url = f"{geoserver_root}/wms" if geoserver_root else "" + + return { + "service": service, "workspace": workspace, + "resource_name": resource_name, "resource_identifier": resource_identifier, + "geoserver_root": geoserver_root, "ows_url": ows_url, "wms_url": wms_url, + } + + def infer_qgis_provider(layer_type, service): + lt = str(layer_type).strip().lower() + sv = str(service).strip().lower() + if lt in {"vector", "point"} or sv == "wfs": + return "WFS" + if lt == "raster" or sv == "wcs": + return "WCS" + if sv == "wms": + return "WMS" + return "" + + def infer_download_format(provider): + p = str(provider).strip().lower() + if p == "wfs": + return "GeoJSON" + if p == "wcs": + return "GeoTIFF" + if p == "wms": + return "Rendered map image" + return "" + + def infer_style_format(style_url): + lowered = str(style_url).lower() + if lowered.endswith(".qml"): + return "QML" + if lowered.endswith(".sld"): + return "SLD" + if lowered.endswith(".json"): + return "JSON" + return "" + + try: + state = request.query_params.get("state", "").lower() + district = request.query_params.get("district", "").lower() + tehsil = request.query_params.get("tehsil", "").lower() + all_active = request.query_params.get("all_active", "").lower() == "true" + output_format = request.query_params.get("format", "json").lower() + + # Build base queryset + layers_qs = Layer.objects.select_related( + "dataset", "state", "district", "block" + ).filter(is_sync_to_geoserver=True) + + # Filter by location + locations = [] + if all_active: + locations = ( + layers_qs.values("state__state_name", "district__district_name", "block__tehsil_name") + .distinct() + ) + locations = [ + { + "state": loc["state__state_name"], + "district": loc["district__district_name"], + "tehsil": loc["block__tehsil_name"], + } + for loc in locations + ] + elif state and district and tehsil: + locations = [{"state": state, "district": district, "tehsil": tehsil}] + else: + locations = [{"state": state, "district": district, "tehsil": tehsil}] + + # Collect layer records + all_records = [] + EXCLUDE_KEYWORDS = ["run_off", "evapotranspiration", "precipitation"] + + for loc in locations: + filters = Q(is_sync_to_geoserver=True) + if loc.get("state"): + filters &= Q(state__state_name__iexact=loc["state"]) + if loc.get("district"): + filters &= Q(district__district_name__iexact=loc["district"]) + if loc.get("tehsil"): + filters &= Q(block__tehsil_name__iexact=loc["tehsil"]) + + for kw in EXCLUDE_KEYWORDS: + filters &= ~Q(layer_name__icontains=kw) + + location_layers = layers_qs.filter(filters).order_by("layer_name", "-layer_version") + + # Deduplicate + seen = {} + for layer in location_layers: + name = layer.layer_name.lower() + if name not in seen: + seen[name] = layer + else: + cv = float(seen[name].layer_version or 0) + nv = float(layer.layer_version or 0) + if nv > cv: + seen[name] = layer + + for layer in seen.values(): + dataset = layer.dataset + workspace = dataset.workspace or "" + layer_type = dataset.layer_type or "" + layer_name = layer.layer_name or "" + + # Get style URLs + style_url = "" + sld_url = "" + if dataset.misc: + style_url = dataset.misc.get("style_url", "") + sld_url = dataset.misc.get("sld_url", "") + + # Generate layer URL + if layer_type in [LayerType.VECTOR, LayerType.POINT]: + layer_url = get_url(workspace, layer_name) + elif layer_type == LayerType.RASTER: + layer_url = raster_tiff_download_url(workspace, layer_name) + else: + layer_url = "" + + service_details = infer_service_details(layer_url) + qgis_provider = infer_qgis_provider(layer_type, service_details["service"]) + + all_records.append({ + "state": layer.state.state_name.lower() if layer.state else "", + "district": layer.district.district_name.lower() if layer.district else "", + "tehsil": layer.block.tehsil_name.lower() if layer.block else "", + "dataset_name": dataset.name or "", + "layer_name": layer_name, + "layer_type": layer_type, + "layer_version": layer.layer_version or "", + "layer_url": layer_url, + "style_url": style_url, + "sld_url": sld_url, + "style_format": infer_style_format(style_url), + "sld_format": "SLD" if sld_url else "", + "gee_asset_path": layer.gee_asset_path or "", + "service_type": service_details["service"], + "workspace": service_details["workspace"], + "resource_identifier": service_details["resource_identifier"], + "resource_name": service_details["resource_name"] or layer_name, + "geoserver_root": service_details["geoserver_root"], + "ows_url": service_details["ows_url"], + "wms_url": service_details["wms_url"], + "qgis_provider": qgis_provider, + "download_format": infer_download_format(qgis_provider), + "geonode_publish_strategy": "remote-service-from-geoserver", + }) + + # Build summary + unique_wms = sorted(set(r["wms_url"] for r in all_records if r.get("wms_url"))) + unique_ws = sorted(set(r["workspace"] for r in all_records if r.get("workspace"))) + type_counts = {} + for r in all_records: + lt = r.get("layer_type", "unknown") or "unknown" + type_counts[lt] = type_counts.get(lt, 0) + 1 + + manifest = { + "generated_at": datetime.now(timezone.utc).isoformat(), + "source": "django_database", + "scope": { + "all_active_locations": all_active, + "requested_state": state, + "requested_district": district, + "requested_tehsil": tehsil, + "location_count": len(locations), + }, + "summary": { + "layer_count": len(all_records), + "layer_type_counts": type_counts, + "unique_workspaces": unique_ws, + "unique_wms_urls": unique_wms, + }, + "locations": locations, + "layers": all_records, + } + + if output_format == "csv": + if not all_records: + return Response( + {"error": "No layers found for the specified location(s)"}, + status=status.HTTP_404_NOT_FOUND, + ) + fieldnames = list(all_records[0].keys()) + output = StringIO() + writer = csv.DictWriter(output, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(all_records) + + response = HttpResponse(output.getvalue(), content_type="text/csv") + response["Content-Disposition"] = "attachment; filename=core_stack_manifest.csv" + return response + + return Response(manifest, status=status.HTTP_200_OK) + + except Exception as e: + print(f"Error in get_layer_manifest: {str(e)}") + return Response( + {"status": "error", "message": str(e)}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR, + ) diff --git a/public_api/urls.py b/public_api/urls.py index 8fac1f2b..58b5b786 100644 --- a/public_api/urls.py +++ b/public_api/urls.py @@ -32,4 +32,9 @@ api.get_village_geometries, name="get-village-geometries", ), + path( + "get_layer_manifest/", + api.get_layer_manifest, + name="get-layer-manifest", + ), ] diff --git a/utilities/scripts/fetch_datagov.py b/utilities/scripts/fetch_datagov.py new file mode 100644 index 00000000..bb6a0a8c --- /dev/null +++ b/utilities/scripts/fetch_datagov.py @@ -0,0 +1,525 @@ +""" +fetch_datagov.py + +A robust, modular, and resumable downloader for data.gov.in resources. + +FEATURES: +--------- +- Resource Dictionary: Use short names (e.g., 'VillageSHG') instead of UUIDs. +- Parallel Fetching: Multi-threaded downloads for speed. +- Storage Backends: CSV, JSON, and JSONL (JSON Lines). +- In-Memory Buffering: Option to keep data in RAM and flush periodically (safer for crashes). +- Deduplication: Ensures clean restarts based on a unique key. +- Resumable: Automatically detects existing file size to resume. +- Added --ignore-total flag to force continuous downloading if API count is wrong. + +USAGE: +------ +The script resolves 'resource_name' from an internal dictionary. +If the name is not found, it assumes the input is a raw Resource ID (UUID). + +1. Using Short Names (Fastest recommended setup): + python fetch_datagov.py VillageSHG --api-key YOUR_KEY --format jsonl --workers 5 + +2. Using Raw Resource ID (Fallback): + python fetch_datagov.py d4206736-a28b-4552-8900-7e0c23c707ac --api-key YOUR_KEY + +3. In-Memory Aggregation (Process data in Python, save to disk periodically): + python fetch_datagov.py MarketCommodityPrice --api-key YOUR_KEY \ + --in-memory --buffer-size 5000 --format jsonl --output prices.jsonl + +4. Standard CSV Download (Sequential): + python fetch_datagov.py VillageSHG --api-key YOUR_KEY --format csv + +5. Force Download (If API reports wrong total count): + python fetch_datagov.py MarketCommodityPrice --api-key YOUR_KEY --ignore-total + +DEPENDENCIES: +------------- +pip install requests pandas tqdm +""" + +import os +import sys +import argparse +import time +import random +import json +import threading +from typing import List, Dict, Optional, Set, Any, Union + +import requests +import pandas as pd +from tqdm import tqdm +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry +from concurrent.futures import ThreadPoolExecutor, as_completed, FIRST_COMPLETED +from dotenv import load_dotenv + +# ============================================================================= +# CONFIGURATION & CONSTANTS +# ============================================================================= + +BASE_URL = "https://api.data.gov.in/resource/" +DEFAULT_LIMIT = 5000 +DEFAULT_TIMEOUT = 60 +REQUEST_RETRIES = 3 +SLEEP_BASE = 2 +PAUSE_BETWEEN_REQUESTS = 0.5 + +# Dictionary of known resources. Add yours here. +KNOWN_RESOURCES = { + "lgd_village":"c967fe8f-69c4-42df-8afc-8a2c98057437", + "lgd_panchayat": "1a6c26ed-d67c-40ea-aa20-d38d35f341a5", + "lgd_subdistrict":"6be51a29-876a-403a-a6da-42fde795e751", + "lgd_district":"37231365-78ba-44d5-ac22-3deec40b9197", + "lgd_state":"a71e60f0-a21d-43de-a6c5-fa5d21600cdb", + "VillageSHG": "d4206736-a28b-4552-8900-7e0c23c707ac", + "mgnrega": "ee03643a-ee4c-48c2-ac30-9f2ff26ab722", + "MarketCommodityPrice": "9ef84268-d588-465a-a308-a864a43d0070", + "RainfallIndia": "ebf28620-cb69-4737-bca3-aaeffc2c57c3", + "DailyCommodityPrice": "35985678-0d79-46b4-9ed6-6f13308a1d24" #Variety-wise Daily Market Prices Data of Commodity + # Add more resources as needed: "your_chosen_name": "UUID" +} + +def ts() -> str: + # Ensure datetime is imported for the ts() function + from datetime import datetime + return datetime.now().strftime("%Y-%m-%d %H:%M:%S") + +# ============================================================================= +# STORAGE HANDLERS +# ============================================================================= + +class BaseStorage: + """Abstract base class for storage strategies.""" + def __init__(self, filepath: str, unique_key: Optional[str]): + self.filepath = os.path.normpath(os.path.expanduser(filepath)) + self.unique_key = unique_key + self.existing_count = 0 + self.seen_keys: Set[Any] = set() + self._ensure_parent_dir() + + def _ensure_parent_dir(self): + parent_dir = os.path.dirname(self.filepath) + if parent_dir: + os.makedirs(parent_dir, exist_ok=True) + + def load(self): + """Load existing state to determine resume point and dedup keys.""" + if os.path.exists(self.filepath): + self._load_data() + print(f"[{ts()}] Loaded {self.existing_count} records from existing file.") + else: + print(f"[{ts()}] No existing file found. Starting fresh.") + + def _load_data(self): raise NotImplementedError + + def append(self, records: List[Dict]) -> int: + """ + Write records to storage. + Returns the number of records actually written (after deduplication). + """ + if not records: + return 0 + + # Deduplication Logic + to_write = [] + if self.unique_key: + for r in records: + val = r.get(self.unique_key) + if val and val in self.seen_keys: + continue + if val: + self.seen_keys.add(val) + to_write.append(r) + else: + to_write = records + + if to_write: + self._write_batch(to_write) + return len(to_write) + + def _write_batch(self, records: List[Dict]): raise NotImplementedError + +class CsvStorage(BaseStorage): + def _load_data(self): + try: + df = pd.read_csv(self.filepath, dtype=str) + self.existing_count = len(df) + if self.unique_key and self.unique_key in df.columns: + self.seen_keys = set(df[self.unique_key].dropna().unique()) + except Exception as e: + print(f"[{ts()}] Warning: Failed to read CSV {e}. Starting fresh.") + + def _write_batch(self, records: List[Dict]): + try: + df_new = pd.DataFrame(records) + if not os.path.exists(self.filepath): + df_new.to_csv(self.filepath, index=False, encoding='utf-8') + else: + # Append without header + df_new.to_csv(self.filepath, mode='a', header=False, index=False, encoding='utf-8') + except Exception as e: + print(f"[{ts()}] Error writing CSV batch: {e}") + +class JsonStorage(BaseStorage): + def _load_data(self): + try: + with open(self.filepath, 'r', encoding='utf-8') as f: + data = json.load(f) + if isinstance(data, list): + self.existing_count = len(data) + if self.unique_key: + self.seen_keys = {item.get(self.unique_key) for item in data if self.unique_key in item} + except Exception as e: + print(f"[{ts()}] Warning: Failed to read JSON {e}. Starting fresh.") + + def _write_batch(self, records: List[Dict]): + # Standard JSON requires reading the whole file to append, + # which is slow for large datasets. + # For robustness, we do Read-Modify-Write here. + try: + file_exists = os.path.exists(self.filepath) + mode = 'r+' if file_exists else 'w+' + with open(self.filepath, mode, encoding='utf-8', newline='\n') as f: + data = [] + if file_exists and os.path.getsize(self.filepath) > 0: + try: + data = json.load(f) + if not isinstance(data, list): + raise ValueError("Root not a list") + except (json.JSONDecodeError, ValueError): + data = [] + data.extend(records) + f.seek(0) + json.dump(data, f, indent=2, ensure_ascii=False) + f.write("\n") + f.truncate() + except Exception as e: + print(f"[{ts()}] Error writing JSON batch: {e}") + +class JsonlStorage(BaseStorage): + """JSON Lines format: One JSON object per line. Append-only.""" + def _load_data(self): + # Must read line-by-line to check for unique keys if dedup is on + count = 0 + if self.unique_key: + try: + with open(self.filepath, 'r', encoding='utf-8') as f: + for line in f: + if not line.strip(): continue + count += 1 + try: + obj = json.loads(line) + val = obj.get(self.unique_key) + if val: self.seen_keys.add(val) + except json.JSONDecodeError: + pass + except Exception as e: + print(f"[{ts()}] Warning: Failed to read JSONL {e}. Starting fresh.") + else: + # If no unique key, we can just check file size/line count roughly? + # Or just assume we start appending. + # For precise resume, we'd count lines. + try: + with open(self.filepath, 'r', encoding='utf-8') as f: + count = sum(1 for _ in f) + except: pass + + self.existing_count = count + + def _write_batch(self, records: List[Dict]): + try: + with open(self.filepath, 'a', encoding='utf-8') as f: + for record in records: + f.write(json.dumps(record, ensure_ascii=False) + "\n") + except Exception as e: + print(f"[{ts()}] Error writing JSONL batch: {e}") + +class MemoryBufferWrapper: + """ + Wraps any BaseStorage to provide in-memory buffering. + Flushes to disk when buffer size is reached. + """ + def __init__(self, storage_backend: BaseStorage, buffer_size: int): + self.storage = storage_backend + self.buffer_size = buffer_size + self.buffer: List[Dict] = [] + self.buffer_lock = threading.Lock() + + # Load existing state from the underlying backend + self.storage.load() + self.existing_count = self.storage.existing_count + + def append(self, records: List[Dict]) -> int: + with self.buffer_lock: + # Dedup and add to buffer + # Note: We run dedup logic against the storage's seen_keys set + # The set is maintained by the storage backend object. + + filtered = [] + if self.storage.unique_key: + for r in records: + val = r.get(self.storage.unique_key) + if val and val in self.storage.seen_keys: + continue + if val: self.storage.seen_keys.add(val) + filtered.append(r) + else: + filtered = records + + self.buffer.extend(filtered) + if len(self.buffer) >= self.buffer_size: + self.flush() + return len(filtered) + + def flush(self): + if not self.buffer: + return + print(f"[{ts()}] Flushing {len(self.buffer)} records to disk...") + self.storage._write_batch(self.buffer) + self.buffer.clear() + + def finalize(self): + self.flush() + +# ============================================================================= +# API HANDLERS +# ============================================================================= + +def make_session(api_key: str) -> requests.Session: + session = requests.Session() + retry = Retry( + total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], + allowed_methods=["GET"] + ) + adapter = HTTPAdapter(max_retries=retry) + session.mount("https://", adapter) + session.headers.update({ + "User-Agent": "python-datagov-fetcher/3.0", + "Accept": "application/json" + }) + return session + +def get_total_records(session: requests.Session, resource_id: str, api_key: str) -> Optional[int]: + url = f"{BASE_URL}{resource_id}" + # FIX: Use limit=0 to get count without fetching a data record. + # This prevents the 'count' from being hidden inside a record object. + params = {"api-key": api_key, "format": "json", "limit": 0, "offset": 0} + try: + resp = session.get(url, params=params, timeout=30) + if resp.status_code == 200: + data = resp.json() + # Fallbacks for different API response structures + count = data.get("count") or data.get("total") or data.get("total_records") + if isinstance(count, int): + return count + except Exception as e: + print(f"[{ts()}] Failed to fetch total count: {e}") + return None + +def fetch_batch(session: requests.Session, resource_id: str, api_key: str, offset: int, limit: int) -> List[Dict]: + url = f"{BASE_URL}{resource_id}" + params = {"api-key": api_key, "format": "json", "limit": limit, "offset": offset} + + for attempt in range(1, REQUEST_RETRIES + 1): + try: + resp = session.get(url, params=params, timeout=DEFAULT_TIMEOUT) + resp.raise_for_status() + data = resp.json() + + records = data.get("records") or data.get("fields", []) + if isinstance(records, list): return records + # Handle edge case where API returns list directly + if isinstance(data, list): return data + return [] + except Exception as e: + sleep = SLEEP_BASE * attempt + random.random() + print(f"[{ts()}] Offset {offset} failed (attempt {attempt}): {e}. Retrying in {sleep:.1f}s") + time.sleep(sleep) + return [] + +# ============================================================================= +# MAIN ORCHESTRATOR +# ============================================================================= + +def resolve_resource_id(name_or_id: str) -> str: + """Resolves a resource name to an ID, or returns the ID if it looks like a UUID.""" + if name_or_id in KNOWN_RESOURCES: + rid = KNOWN_RESOURCES[name_or_id] + print(f"[{ts()}] Resolved resource '{name_or_id}' to ID: {rid}") + return rid + # Heuristic: if it contains dashes and looks like a UUID, assume it is + if "-" in name_or_id and len(name_or_id) > 30: + print(f"[{ts()}] Using provided Resource ID directly: {name_or_id}") + return name_or_id + raise ValueError(f"Resource '{name_or_id}' not found in KNOWN_RESOURCES and doesn't look like an ID.") + +def main(): + parser = argparse.ArgumentParser( + description="Robust Data.gov.in Downloader", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog="Available Resources: " + ", ".join(KNOWN_RESOURCES.keys()) + ) + + # Positional + parser.add_argument("resource", help="Resource Name (e.g. VillageSHG) or Resource ID (UUID)") + + # Auth + load_dotenv() # this will read .env and set environment variables + parser.add_argument("--api-key", default=os.getenv("datagov-api-key"), help="API Key (env: datagov-api-key)") + + # Output + parser.add_argument("--output", "-o", help="Output file path (default: .)") + parser.add_argument("--format", choices=["csv", "json", "jsonl"], default="jsonl", + help="Format (default: jsonl - recommended for speed)") + + # Behavior + parser.add_argument("--unique-key", help="Field to deduplicate by (e.g., 'sno', 'shg_id')") + parser.add_argument("--in-memory", action="store_true", + help="Keep data in memory buffer and flush periodically (safer for mid-script crashes)") + parser.add_argument("--buffer-size", type=int, default=5000, + help="Records to buffer before flushing to disk (default: 5000)") + + # Performance + parser.add_argument("--limit", type=int, default=DEFAULT_LIMIT, help="Records per API request") + parser.add_argument("--workers", type=int, default=1, help="Parallel threads (default: 1)") + parser.add_argument("--ignore-total", action="store_true", help="Ignore API total count and download until empty") + + args = parser.parse_args() + + if not args.api_key: + sys.exit("Error: API Key missing. Set env var or use --api-key") + + # 1. Resolve Resource + try: + resource_id = resolve_resource_id(args.resource) + except ValueError as e: + sys.exit(str(e)) + + # 2. Determine Output Filename + if not args.output: + args.output = f"{args.resource}.{args.format}" + + # 3. Setup Storage + print(f"[{ts()}] Initializing storage: {args.format} -> {args.output}") + if args.format == 'jsonl': + storage_backend = JsonlStorage(args.output, args.unique_key) + elif args.format == 'json': + storage_backend = JsonStorage(args.output, args.unique_key) + else: # default: csv + storage_backend = CsvStorage(args.output, args.unique_key) + + + # 4. Wrap in Memory Buffer if requested + if args.in_memory: + storage = MemoryBufferWrapper(storage_backend, args.buffer_size) + print(f"[{ts()}] In-Memory Buffering enabled (Flush every {args.buffer_size} records).") + else: + storage = storage_backend + + # 5. Load Existing State + storage.load() + start_offset = storage.existing_count + print(f"[{ts()}] Resuming from offset: {start_offset}") + + # 6. API Setup + session = make_session(args.api_key) + total = None + if not args.ignore_total: + total = get_total_records(session, resource_id, args.api_key) + + # Logic to handle bad totals (e.g. if API returns 1 but there are millions) + if total and total < start_offset: + print(f"[{ts()}] WARNING: API Total ({total}) is less than existing data ({start_offset}).") + print(f"[{ts()}] Ignoring API Total and resuming.") + total = None + elif total: + print(f"[{ts()}] Total records reported: {total}") + else: + print(f"[{ts()}] Total records unknown (or ignored). Will download until empty batch.") + + pbar = tqdm(total=total, initial=start_offset, unit="rec", desc="Downloading") + + stop_flag = False + next_offset = start_offset + # Max pending futures to prevent memory explosion if API is super fast + MAX_PENDING_FUTURES = args.workers * 2 + + thread_lock = threading.Lock() + + def process_batch(offset: int) -> tuple[int, List[Dict]]: + records = fetch_batch(session, resource_id, args.api_key, offset, args.limit) + added = storage.append(records) + with thread_lock: + pbar.update(added) + return offset, records + + with ThreadPoolExecutor(max_workers=args.workers) as executor: + futures = {} + + while not stop_flag: + # 1. Submit new tasks + # Condition to submit: We haven't hit a known total, AND we don't have too many pending tasks + can_submit = False + with thread_lock: + if total is None: + can_submit = True + else: + if next_offset < total: + can_submit = True + + if can_submit and len(futures) < MAX_PENDING_FUTURES: + # Check if we should stop because of a previous empty batch signal + # (Logic handled inside the loop below) + + offset_to_fetch = next_offset + next_offset += args.limit + future = executor.submit(process_batch, offset_to_fetch) + futures[future] = offset_to_fetch + + # 2. Wait for completion + # Use as_completed with FIRST_COMPLETED. + # We don't use a small timeout here because we want to wait for at least one result. + if not futures: + # If no futures and we can't submit (e.g. total reached or stop_flag set), break + if not can_submit: + break + # If no futures but we can submit (e.g. loop start), submit immediately (done above) + # If still no futures after submit check, sleep briefly to avoid tight loop + time.sleep(0.1) + continue + + # Wait for ANY future to complete + completed_set = as_completed(futures) + + # We need to iterate over the generator. Since we only want one (or as many as are ready), + # we can wrap it or iterate once. + # Using a loop to drain all currently completed futures + for f in completed_set: + offset = futures.pop(f) + try: + fetched_offset, records = f.result() + + if not records: + # If we get an empty batch, we might be done. + # However, if we are far ahead (parallel), an empty batch might just be a gap? + # data.gov usually returns empty if offset >= total. + print(f"[{ts()}] Empty batch received at offset {fetched_offset}. Finishing up...") + stop_flag = True + + except Exception as e: + print(f"[{ts()}] Error in future {offset}: {e}") + + # Break inner loop to check stop_flag at top of while + break + + pbar.close() + if hasattr(storage, 'finalize'): + storage.finalize() + + print(f"[{ts()}] Finished. Data saved to {args.output}") + +if __name__ == "__main__": + main() From 125daef5502a20686bed405db503e6f636615a48 Mon Sep 17 00:00:00 2001 From: amit-spatial Date: Wed, 1 Apr 2026 21:51:38 +0000 Subject: [PATCH 2/3] added general script to quickly download data.gov data --- public_api/api.py | 268 --------------------------------------------- public_api/urls.py | 5 - 2 files changed, 273 deletions(-) diff --git a/public_api/api.py b/public_api/api.py index 7052327d..795a8dfa 100644 --- a/public_api/api.py +++ b/public_api/api.py @@ -2,7 +2,6 @@ from rest_framework.response import Response from rest_framework import status from django.http import JsonResponse -from django.db.models import Q from utilities.gee_utils import ( valid_gee_text, ) @@ -20,10 +19,6 @@ get_mws_geometries_data, get_village_geometries_data, ) -from computing.models import Layer, Dataset, LayerType -from geoadmin.models import StateSOI, DistrictSOI, TehsilSOI -from stats_generator.utils import get_url -from nrm_app.settings import GEOSERVER_URL from utilities.auth_check_decorator import api_security_check from drf_yasg.utils import swagger_auto_schema from .swagger_schemas import ( @@ -492,266 +487,3 @@ def get_village_geometries(request): {"error": f"Internal server error: {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) - - -############# Get Layer Manifest for GeoNode/QGIS ################## -@api_security_check(auth_type="API_key") -def get_layer_manifest(request): - """ - Return a GeoNode/QGIS-ready manifest of all layers. - - Query parameters: - - state: Filter by state (optional) - - district: Filter by district (optional) - - tehsil: Filter by tehsil/block (optional) - - all_active: Return all active locations (optional, boolean) - - format: Output format - 'json' or 'csv' (default: json) - """ - from datetime import datetime, timezone - from urllib.parse import parse_qs, urlparse - from django.http import HttpResponse - import csv - from io import StringIO - - def raster_tiff_download_url(workspace, layer_name): - return ( - f"{GEOSERVER_URL}/{workspace}/wcs?service=WCS&version=2.0.1" - f"&request=GetCoverage&CoverageId={workspace}:{layer_name}" - f"&format=geotiff&compression=LZW&tiling=true&tileheight=256&tilewidth=256" - ) - - def infer_service_details(layer_url): - if not layer_url: - return { - "service": "", "workspace": "", "resource_name": "", - "resource_identifier": "", "geoserver_root": "", "ows_url": "", "wms_url": "", - } - parsed = urlparse(layer_url) - query_params = parse_qs(parsed.query) - - path = parsed.path or "" - marker_index = path.find("/geoserver") - if marker_index == -1: - geoserver_root = f"{parsed.scheme}://{parsed.netloc}" - else: - prefix = path[: marker_index + len("/geoserver")] - geoserver_root = f"{parsed.scheme}://{parsed.netloc}{prefix}" - - service = query_params.get("service", [""])[0].upper() - workspace = "" - resource_name = "" - resource_identifier = "" - - if "typeName" in query_params and query_params["typeName"]: - resource_identifier = query_params["typeName"][0] - if ":" in resource_identifier: - workspace, resource_name = resource_identifier.split(":", 1) - elif "CoverageId" in query_params and query_params["CoverageId"]: - resource_identifier = query_params["CoverageId"][0] - if ":" in resource_identifier: - workspace, resource_name = resource_identifier.split(":", 1) - - ows_url = f"{geoserver_root}/{workspace}/ows" if workspace else "" - wms_url = f"{geoserver_root}/wms" if geoserver_root else "" - - return { - "service": service, "workspace": workspace, - "resource_name": resource_name, "resource_identifier": resource_identifier, - "geoserver_root": geoserver_root, "ows_url": ows_url, "wms_url": wms_url, - } - - def infer_qgis_provider(layer_type, service): - lt = str(layer_type).strip().lower() - sv = str(service).strip().lower() - if lt in {"vector", "point"} or sv == "wfs": - return "WFS" - if lt == "raster" or sv == "wcs": - return "WCS" - if sv == "wms": - return "WMS" - return "" - - def infer_download_format(provider): - p = str(provider).strip().lower() - if p == "wfs": - return "GeoJSON" - if p == "wcs": - return "GeoTIFF" - if p == "wms": - return "Rendered map image" - return "" - - def infer_style_format(style_url): - lowered = str(style_url).lower() - if lowered.endswith(".qml"): - return "QML" - if lowered.endswith(".sld"): - return "SLD" - if lowered.endswith(".json"): - return "JSON" - return "" - - try: - state = request.query_params.get("state", "").lower() - district = request.query_params.get("district", "").lower() - tehsil = request.query_params.get("tehsil", "").lower() - all_active = request.query_params.get("all_active", "").lower() == "true" - output_format = request.query_params.get("format", "json").lower() - - # Build base queryset - layers_qs = Layer.objects.select_related( - "dataset", "state", "district", "block" - ).filter(is_sync_to_geoserver=True) - - # Filter by location - locations = [] - if all_active: - locations = ( - layers_qs.values("state__state_name", "district__district_name", "block__tehsil_name") - .distinct() - ) - locations = [ - { - "state": loc["state__state_name"], - "district": loc["district__district_name"], - "tehsil": loc["block__tehsil_name"], - } - for loc in locations - ] - elif state and district and tehsil: - locations = [{"state": state, "district": district, "tehsil": tehsil}] - else: - locations = [{"state": state, "district": district, "tehsil": tehsil}] - - # Collect layer records - all_records = [] - EXCLUDE_KEYWORDS = ["run_off", "evapotranspiration", "precipitation"] - - for loc in locations: - filters = Q(is_sync_to_geoserver=True) - if loc.get("state"): - filters &= Q(state__state_name__iexact=loc["state"]) - if loc.get("district"): - filters &= Q(district__district_name__iexact=loc["district"]) - if loc.get("tehsil"): - filters &= Q(block__tehsil_name__iexact=loc["tehsil"]) - - for kw in EXCLUDE_KEYWORDS: - filters &= ~Q(layer_name__icontains=kw) - - location_layers = layers_qs.filter(filters).order_by("layer_name", "-layer_version") - - # Deduplicate - seen = {} - for layer in location_layers: - name = layer.layer_name.lower() - if name not in seen: - seen[name] = layer - else: - cv = float(seen[name].layer_version or 0) - nv = float(layer.layer_version or 0) - if nv > cv: - seen[name] = layer - - for layer in seen.values(): - dataset = layer.dataset - workspace = dataset.workspace or "" - layer_type = dataset.layer_type or "" - layer_name = layer.layer_name or "" - - # Get style URLs - style_url = "" - sld_url = "" - if dataset.misc: - style_url = dataset.misc.get("style_url", "") - sld_url = dataset.misc.get("sld_url", "") - - # Generate layer URL - if layer_type in [LayerType.VECTOR, LayerType.POINT]: - layer_url = get_url(workspace, layer_name) - elif layer_type == LayerType.RASTER: - layer_url = raster_tiff_download_url(workspace, layer_name) - else: - layer_url = "" - - service_details = infer_service_details(layer_url) - qgis_provider = infer_qgis_provider(layer_type, service_details["service"]) - - all_records.append({ - "state": layer.state.state_name.lower() if layer.state else "", - "district": layer.district.district_name.lower() if layer.district else "", - "tehsil": layer.block.tehsil_name.lower() if layer.block else "", - "dataset_name": dataset.name or "", - "layer_name": layer_name, - "layer_type": layer_type, - "layer_version": layer.layer_version or "", - "layer_url": layer_url, - "style_url": style_url, - "sld_url": sld_url, - "style_format": infer_style_format(style_url), - "sld_format": "SLD" if sld_url else "", - "gee_asset_path": layer.gee_asset_path or "", - "service_type": service_details["service"], - "workspace": service_details["workspace"], - "resource_identifier": service_details["resource_identifier"], - "resource_name": service_details["resource_name"] or layer_name, - "geoserver_root": service_details["geoserver_root"], - "ows_url": service_details["ows_url"], - "wms_url": service_details["wms_url"], - "qgis_provider": qgis_provider, - "download_format": infer_download_format(qgis_provider), - "geonode_publish_strategy": "remote-service-from-geoserver", - }) - - # Build summary - unique_wms = sorted(set(r["wms_url"] for r in all_records if r.get("wms_url"))) - unique_ws = sorted(set(r["workspace"] for r in all_records if r.get("workspace"))) - type_counts = {} - for r in all_records: - lt = r.get("layer_type", "unknown") or "unknown" - type_counts[lt] = type_counts.get(lt, 0) + 1 - - manifest = { - "generated_at": datetime.now(timezone.utc).isoformat(), - "source": "django_database", - "scope": { - "all_active_locations": all_active, - "requested_state": state, - "requested_district": district, - "requested_tehsil": tehsil, - "location_count": len(locations), - }, - "summary": { - "layer_count": len(all_records), - "layer_type_counts": type_counts, - "unique_workspaces": unique_ws, - "unique_wms_urls": unique_wms, - }, - "locations": locations, - "layers": all_records, - } - - if output_format == "csv": - if not all_records: - return Response( - {"error": "No layers found for the specified location(s)"}, - status=status.HTTP_404_NOT_FOUND, - ) - fieldnames = list(all_records[0].keys()) - output = StringIO() - writer = csv.DictWriter(output, fieldnames=fieldnames) - writer.writeheader() - writer.writerows(all_records) - - response = HttpResponse(output.getvalue(), content_type="text/csv") - response["Content-Disposition"] = "attachment; filename=core_stack_manifest.csv" - return response - - return Response(manifest, status=status.HTTP_200_OK) - - except Exception as e: - print(f"Error in get_layer_manifest: {str(e)}") - return Response( - {"status": "error", "message": str(e)}, - status=status.HTTP_500_INTERNAL_SERVER_ERROR, - ) diff --git a/public_api/urls.py b/public_api/urls.py index 58b5b786..8fac1f2b 100644 --- a/public_api/urls.py +++ b/public_api/urls.py @@ -32,9 +32,4 @@ api.get_village_geometries, name="get-village-geometries", ), - path( - "get_layer_manifest/", - api.get_layer_manifest, - name="get-layer-manifest", - ), ] From 0093d675f26e0250edacd6dd44fbf529767316c6 Mon Sep 17 00:00:00 2001 From: amit-spatial Date: Wed, 1 Apr 2026 22:07:18 +0000 Subject: [PATCH 3/3] refined example usage --- utilities/scripts/fetch_datagov.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utilities/scripts/fetch_datagov.py b/utilities/scripts/fetch_datagov.py index bb6a0a8c..4f0a65e8 100644 --- a/utilities/scripts/fetch_datagov.py +++ b/utilities/scripts/fetch_datagov.py @@ -5,7 +5,7 @@ FEATURES: --------- -- Resource Dictionary: Use short names (e.g., 'VillageSHG') instead of UUIDs. +- Resource Dictionary: Use short names (e.g., 'lgd_villages') instead of UUIDs. - Parallel Fetching: Multi-threaded downloads for speed. - Storage Backends: CSV, JSON, and JSONL (JSON Lines). - In-Memory Buffering: Option to keep data in RAM and flush periodically (safer for crashes).