From fc20ade73f341e73587ea1421f20d3419b99f6b4 Mon Sep 17 00:00:00 2001 From: "kapil.dadheech@gramvaani.org" Date: Mon, 23 Mar 2026 13:17:01 +0530 Subject: [PATCH 1/2] =?UTF-8?q?Fix=20GeoServer=20sync=20for=20multipart=20?= =?UTF-8?q?GeoTIFFs=20and=20chunk=20ZOI=20NDVI=20export.Improve=20robustne?= =?UTF-8?q?ss=20of=20raster/vector=20exports=20and=20make=20waterbody=20de?= =?UTF-8?q?silting=20regeneration=20configurable.Handle=20large=20LULC/ZOI?= =?UTF-8?q?=20multipart=20outputs;=20add=20optional=20desilting=20point=20?= =?UTF-8?q?regeneration=20behavior=E2=80=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- computing/drought/merge_layers.py | 79 ++++++++++------- computing/lulc/lulc_v3.py | 2 +- computing/zoi_layers/zoi3.py | 74 ++++++++++++++-- utilities/gee_utils.py | 133 +++++++++++++++++++++++++-- waterrejuvenation/models.py | 13 +-- waterrejuvenation/tasks.py | 143 +++++++++++++++++++----------- waterrejuvenation/tests.py | 87 +++++++++++++++++- waterrejuvenation/views.py | 108 ++++++++++++---------- 8 files changed, 482 insertions(+), 157 deletions(-) diff --git a/computing/drought/merge_layers.py b/computing/drought/merge_layers.py index 29784850..4bf2c79c 100644 --- a/computing/drought/merge_layers.py +++ b/computing/drought/merge_layers.py @@ -59,31 +59,30 @@ def merge_yearly_layers( print(asset_suffix) print(asset_folder_list) print(f"merge yearly layers {app_type}") - # Create required GEE asset path components + ee_initialize(gee_account_id) gee_obj = GEEAccount.objects.get(pk=gee_account_id) - helper_account_path = build_gee_helper_paths(app_type, gee_obj.helper_account.name) - # Create export asset path (must be constant for export) - description = f"drought_{asset_suffix}" # _{start_year}_{end_year}" - print(description) + + description = f"drought_{asset_suffix}" + gee_asset = get_gee_dir_path( asset_folder_list, asset_path=GEE_PATHS[app_type]["GEE_ASSET_PATH"] ) asset_id = f"{gee_asset}{description}" + + print(description) print(asset_id) - # Check if asset already exists if is_gee_asset_exists(asset_id): ee.data.deleteAsset(asset_id) - # return None def get_collection_path(year: int) -> str: - """Get the full path for a year's collection.""" asset_path = GEE_PATHS[app_type]["GEE_ASSET_PATH"] return f"{get_gee_dir_path(asset_folder_list, asset_path=asset_path)}drought_{asset_suffix}_{year}_v2" - # Get base feature collection + # Base collection first_year_fc = ee.FeatureCollection(get_collection_path(start_year)) + geometries_with_ids = first_year_fc.map( lambda f: ee.Feature( f.geometry(), @@ -96,20 +95,25 @@ def get_collection_path(year: int) -> str: ) def merge_year_data(feature): - """Server-side function to merge data from all years.""" uid = feature.get("uid") def process_year(prev_feature, year): - """Process a single year's data.""" feat = ee.Feature(prev_feature) - # Get year's collection year_fc = ee.FeatureCollection(get_collection_path(year)) + + filtered = year_fc.filter(ee.Filter.equals("uid", uid)) + + # ✅ SAFE feature year_feature = ee.Feature( - year_fc.filter(ee.Filter.equals("uid", uid)).first() + ee.Algorithms.If( + filtered.size().gt(0), filtered.first(), ee.Feature(None, {}) + ) ) - # Base property names + # ------------------------- + # Property lists + # ------------------------- base_props = [ "drought_labels_" + str(year), "dryspell_length_" + str(year), @@ -131,7 +135,6 @@ def process_year(prev_feature, year): "total_weeks_" + str(year), ] - # Base property names renamed_props = [ "drlb_" + str(year), "drysp_" + str(year), @@ -153,19 +156,23 @@ def process_year(prev_feature, year): "t_wks_" + str(year), ] - prop_names = ee.List(base_props) - - renamed_prop_names = ee.List(renamed_props) + # ------------------------- + # SAFE getter + # ------------------------- + def safe_get(prop): + return ee.Algorithms.If( + year_feature.get(prop), year_feature.get(prop), None + ) - # Get property values - prop_values = prop_names.map(lambda x: year_feature.get(x)) + prop_values = ee.List(base_props).map(lambda x: safe_get(x)) - # Create base properties dictionary - base_dict = ee.Dictionary.fromLists(renamed_prop_names, prop_values) + base_dict = ee.Dictionary.fromLists(ee.List(renamed_props), prop_values) - all_properties = ee.Feature(year_feature).propertyNames() + # ------------------------- + # Rainfall properties + # ------------------------- + all_properties = year_feature.propertyNames() - # Filter properties that start with the prefix orig_rainfall_names = all_properties.filter( ee.Filter.stringStartsWith("item", "monthly_rainfall_deviation_") ) @@ -175,19 +182,25 @@ def rename_property(prop): renamed_rainfall_names = orig_rainfall_names.map(rename_property) - # Get rainfall values - rainfall_values = orig_rainfall_names.map(lambda x: year_feature.get(x)) + rainfall_values = orig_rainfall_names.map(lambda x: safe_get(x)) - # Create rainfall properties dictionary rainfall_dict = ee.Dictionary.fromLists( renamed_rainfall_names, rainfall_values ) - # Adding dry_spell length values from previous years with current year - total_dryspell = ee.Number(feat.get("avg_dryspell")).add( - ee.Number(year_feature.get("dryspell_length_" + str(year))) + # ------------------------- + # SAFE dryspell + # ------------------------- + dryspell_val = ee.Number( + ee.Algorithms.If( + year_feature.get("dryspell_length_" + str(year)), + year_feature.get("dryspell_length_" + str(year)), + 0, + ) ) + total_dryspell = ee.Number(feat.get("avg_dryspell")).add(dryspell_val) + return ( feat.set(base_dict) .set(rainfall_dict) @@ -196,21 +209,19 @@ def rename_property(prop): feature = feature.set("avg_dryspell", 0) - # Process all years years = range(start_year, end_year + 1) processed_feature = reduce(process_year, years, feature) num_years = ee.Number(end_year).subtract(start_year).add(1) + avg_dryspell = ee.Number(processed_feature.get("avg_dryspell")).divide( num_years ) - # Set average dryspell and remove total return processed_feature.set("avg_dryspell", avg_dryspell) - # Process all features merged_fc = geometries_with_ids.map(merge_year_data) - # Export feature collection to GEE task_id = export_vector_asset_to_gee(merged_fc, description, asset_id) + return task_id diff --git a/computing/lulc/lulc_v3.py b/computing/lulc/lulc_v3.py index e2b2200f..36af459d 100644 --- a/computing/lulc/lulc_v3.py +++ b/computing/lulc/lulc_v3.py @@ -124,7 +124,7 @@ def clip_lulc_v3( pan_india = ee.Image( f"projects/corestack-datasets/assets/datasets/LULC_v3_river_basin/pan_india_lulc_v3_{curr_start_year}_{curr_end_year}" ) - clipped_lulc = pan_india.clip(roi.geometry()) + clipped_lulc = pan_india.clipToCollection(roi.geometry()) l1_asset_new.append(clipped_lulc) task_list = [] diff --git a/computing/zoi_layers/zoi3.py b/computing/zoi_layers/zoi3.py index fea3b962..d3b99503 100644 --- a/computing/zoi_layers/zoi3.py +++ b/computing/zoi_layers/zoi3.py @@ -8,11 +8,26 @@ valid_gee_text, get_gee_dir_path, is_gee_asset_exists, + check_task_status, + export_vector_asset_to_gee, ) from waterrejuvenation.utils import wait_for_task_completion import ee +def _build_fc_chunks(fc, chunk_size): + size = fc.size().getInfo() + parts = size // chunk_size + chunks = [] + for part in range(parts + 1): + start = part * chunk_size + end = start + chunk_size + chunk = ee.FeatureCollection(fc.toList(fc.size()).slice(start, end)) + if chunk.size().getInfo() > 0: + chunks.append((part, chunk)) + return chunks + + def get_ndvi_for_zoi( state=None, district=None, @@ -56,12 +71,59 @@ def get_ndvi_for_zoi( ) zoi_collections = ee.FeatureCollection(asset_id_zoi) - fc = get_ndvi_data(zoi_collections, 2017, 2024, description_ndvi, ndvi_asset_path) - task = ee.batch.Export.table.toAsset( - collection=fc, description=description_ndvi, assetId=ndvi_asset_path - ) - task.start() - wait_for_task_completion(task) + total_features = zoi_collections.size().getInfo() + print(f"Total ZOI features for NDVI: {total_features}") + + # For larger feature collections, split into chunks and merge chunk assets. + # This avoids export failures for heavy NDVI computations. + chunk_size = 40 + if total_features > chunk_size: + print(f"Running chunked NDVI generation with chunk size {chunk_size}") + chunk_assets = [] + chunk_merge_task_ids = [] + chunks = _build_fc_chunks(zoi_collections, chunk_size) + + for part, chunk_fc in chunks: + chunk_suffix = f"{description_ndvi}_chunk_{part}" + chunk_asset_path = f"{ndvi_asset_path}_chunk_{part}" + + if is_gee_asset_exists(chunk_asset_path): + ee.data.deleteAsset(chunk_asset_path) + + chunk_result_fc = get_ndvi_data( + chunk_fc, 2017, 2024, chunk_suffix, chunk_asset_path + ) + task_id = export_vector_asset_to_gee( + chunk_result_fc, chunk_suffix, chunk_asset_path + ) + if task_id: + chunk_merge_task_ids.append(task_id) + chunk_assets.append(chunk_asset_path) + + if chunk_merge_task_ids: + check_task_status(chunk_merge_task_ids) + + if not chunk_assets: + raise Exception("No NDVI chunk assets were generated for merge.") + + merged_fc = ee.FeatureCollection(chunk_assets).flatten() + if is_gee_asset_exists(ndvi_asset_path): + ee.data.deleteAsset(ndvi_asset_path) + final_task = ee.batch.Export.table.toAsset( + collection=merged_fc, description=description_ndvi, assetId=ndvi_asset_path + ) + final_task.start() + wait_for_task_completion(final_task) + fc = ee.FeatureCollection(ndvi_asset_path) + else: + fc = get_ndvi_data(zoi_collections, 2017, 2024, description_ndvi, ndvi_asset_path) + task = ee.batch.Export.table.toAsset( + collection=fc, description=description_ndvi, assetId=ndvi_asset_path + ) + task.start() + wait_for_task_completion(task) + fc = ee.FeatureCollection(ndvi_asset_path) + start_date = "30-06-2017" end_date = "01-07-2024" if state and district and block: diff --git a/utilities/gee_utils.py b/utilities/gee_utils.py index a8d4cea1..273fa052 100644 --- a/utilities/gee_utils.py +++ b/utilities/gee_utils.py @@ -18,6 +18,7 @@ import re import json import subprocess +import shutil from google.cloud import storage from google.api_core import retry from utilities.geoserver_utils import Geoserver @@ -417,10 +418,123 @@ def sync_raster_gcs_to_geoserver(workspace, gcs_file_name, layer_name, style_nam geo.delete_raster_store(workspace=workspace, store=layer_name) bucket = gcs_config() - blob = bucket.blob("nrm_raster/" + gcs_file_name + ".tif") - tif_content = blob.download_as_bytes() + # Earth Engine may export large GeoTIFFs as multiple files (e.g. "-part-0.tif"). + # GeoServer expects a single GeoTIFF, so we download and merge parts when needed. + expected_blob_name = f"nrm_raster/{gcs_file_name}.tif" + expected_blob = bucket.blob(expected_blob_name) - file_upload_res = geo.upload_raster(tif_content, workspace, layer_name) + merge_tmp_root = os.path.join( + os.path.dirname(os.path.dirname(__file__)), "data", "gdal_merge_tmp" + ) + os.makedirs(merge_tmp_root, exist_ok=True) + + with tempfile.TemporaryDirectory( + prefix="geoserver_raster_", dir=merge_tmp_root + ) as tmpdir: + if expected_blob.exists(): + local_tif_path = os.path.join( + tmpdir, os.path.basename(expected_blob_name) + ) + expected_blob.download_to_filename(local_tif_path) + with open(local_tif_path, "rb") as f: + file_upload_res = geo.upload_raster(f, workspace, layer_name) + else: + prefix = f"nrm_raster/{gcs_file_name}" + part_blobs = [] + # list_blobs(prefix=...) is bounded because we keep a tight prefix. + # Earth Engine naming can vary for large exports, so we match both: + # - "-part-0.tif" style + # - ".tif-part-0" style + candidate_prefixes = [prefix, expected_blob_name] + seen_names = set() + for candidate_prefix in candidate_prefixes: + for b in bucket.list_blobs(prefix=candidate_prefix): + if b.name == expected_blob_name: + continue + if ".tif" in b.name.lower(): + if b.name not in seen_names: + seen_names.add(b.name) + part_blobs.append(b) + + if not part_blobs: + raise FileNotFoundError( + f"Expected GeoTIFF not found in GCS: '{expected_blob_name}'. " + f"No part files found with prefix '{prefix}'." + ) + print( + f"Expected GeoTIFF missing; found {len(part_blobs)} part file(s) for '{gcs_file_name}'. Merging before GeoServer upload." + ) + + # Deterministic merge order. + part_blobs.sort(key=lambda b: b.name) + local_part_paths = [] + # Use short local filenames to avoid Windows path-length issues with GDAL. + for idx, b in enumerate(part_blobs): + local_part_path = os.path.join(tmpdir, f"part_{idx}.tif") + b.download_to_filename(local_part_path) + local_part_paths.append(local_part_path) + + merged_tif_path = os.path.join(tmpdir, "merged.tif") + + # Prefer gdal_merge.py if available; fall back to VRT+translate otherwise. + gdal_merge_py = shutil.which("gdal_merge.py") + if gdal_merge_py: + cmd = ["gdal_merge.py", "-o", merged_tif_path] + local_part_paths + proc = subprocess.run(cmd, capture_output=True, text=True) + if proc.returncode != 0: + raise RuntimeError( + f"gdal_merge.py failed (exit={proc.returncode}): {proc.stderr or proc.stdout}" + ) + else: + # Build VRT then translate into a single GeoTIFF. + vrt_path = os.path.join(tmpdir, "merged.vrt") + gdalbuildvrt = shutil.which("gdalbuildvrt") + gdal_translate = shutil.which("gdal_translate") + if not gdalbuildvrt or not gdal_translate: + raise RuntimeError( + "GDAL tools not found. Need either 'gdal_merge.py' or both " + "'gdalbuildvrt' and 'gdal_translate' on PATH." + ) + proc_vrt = subprocess.run( + [gdalbuildvrt, vrt_path] + local_part_paths, + capture_output=True, + text=True, + ) + if proc_vrt.returncode != 0: + raise RuntimeError( + f"gdalbuildvrt failed (exit={proc_vrt.returncode}): {proc_vrt.stderr or proc_vrt.stdout}" + ) + proc_tr = subprocess.run( + [ + gdal_translate, + "-of", + "GTiff", + "-co", + "TILED=YES", + vrt_path, + merged_tif_path, + ], + capture_output=True, + text=True, + ) + if proc_tr.returncode != 0: + raise RuntimeError( + f"gdal_translate failed (exit={proc_tr.returncode}): {proc_tr.stderr or proc_tr.stdout}" + ) + + if not os.path.exists(merged_tif_path): + # Extra guard: sometimes GDAL returns non-zero or fails to create the file. + # Provide the tempdir listing to help diagnose quickly. + tmp_files = [] + for name in os.listdir(tmpdir): + tmp_files.append(name) + raise FileNotFoundError( + f"GDAL merge completed but output is missing: '{merged_tif_path}'. " + f"Tempdir contains: {tmp_files}" + ) + + with open(merged_tif_path, "rb") as f: + file_upload_res = geo.upload_raster(f, workspace, layer_name) print("File response:", file_upload_res) if style_name: style_res = geo.publish_style( @@ -453,8 +567,17 @@ def upload_tif_to_gcs(gcs_file_name, local_file_path): def gcs_file_exists(layer_name): bucket = gcs_config() - blob = bucket.blob(f"nrm_raster/{layer_name}.tif") - return blob.exists() + expected_blob = bucket.blob(f"nrm_raster/{layer_name}.tif") + if expected_blob.exists(): + return True + + # Earth Engine may export large GeoTIFFs as multiple "-part-*.tif" objects. + # In that case, the exact ".tif" name won't exist, but we still want to treat + # the raster as "exported" for the next pipeline step. + prefix = f"nrm_raster/{layer_name}" + for _ in bucket.list_blobs(prefix=prefix): + return True + return False def upload_tif_from_gcs_to_gee(gcs_path, asset_id, scale): diff --git a/waterrejuvenation/models.py b/waterrejuvenation/models.py index 3f4397fb..8aa2226d 100644 --- a/waterrejuvenation/models.py +++ b/waterrejuvenation/models.py @@ -73,18 +73,7 @@ def save(self, *args, **kwargs): super().save(*args, **kwargs) print(f"is processing required: {self.is_processing_required}") print(f"is lullc required: {self.is_lulc_required}") - if self.is_compute: - Upload_Desilting_Points.apply_async( - kwargs={ - "file_obj_id": self.id, - "gee_account_id": self.gee_account_id, - "is_lulc_required": self.is_lulc_required, - "is_processing_required": self.is_processing_required, - "is_closest_wp": self.is_closest_wp, - }, - queue="waterbody1", - ) - else: + if not self.is_compute: logger.info( f"File uploaded by user {self.uploaded_by.username}. Triggering email to superadmins and support" ) diff --git a/waterrejuvenation/tasks.py b/waterrejuvenation/tasks.py index 52ccae3b..dcc12358 100644 --- a/waterrejuvenation/tasks.py +++ b/waterrejuvenation/tasks.py @@ -37,6 +37,8 @@ import logging from datetime import datetime import geemap + + from waterrejuvenation.utils import ( wait_for_task_completion, delete_asset_on_GEE, @@ -72,6 +74,7 @@ def Upload_Desilting_Points( is_lulc_required=True, gee_account_id=None, is_processing_required=True, + is_force_regeneration=True, ): import pandas as pd from .models import WaterbodiesFileUploadLog, WaterbodiesDesiltingLog @@ -87,6 +90,9 @@ def normalize(val): wb_obj = WaterbodiesFileUploadLog.objects.get(pk=file_obj_id) proj_obj = Project.objects.get(pk=wb_obj.project_id) + if is_force_regeneration: + wdsl_log = WaterbodiesDesiltingLog.objects.filter(project_id=wb_obj.project_id) + wdsl_log.delete() if wb_obj.process: logger.warning("File already processed. Skipping.") @@ -435,90 +441,125 @@ def Genereate_zoi_and_zoi_indicator( def BuildDesiltingLayer( project_id, asset_suffix=None, asset_folder=None, gee_account_id=None ): - # ee_initialize(gee_account_id) - from .models import WaterbodiesDesiltingLog + from waterrejuvenation.models import WaterbodiesDesiltingLog + + # ee_initialize(gee_account_id) # Uncomment if needed instance = Project.objects.get(pk=project_id) data = WaterbodiesDesiltingLog.objects.filter( project_id=project_id, closest_wb_lat__isnull=False, process=True ) + + # Asset paths asset_folder = [instance.name] - assst_suffix_desilt = f"Desilt_layer_{instance.name}_{instance.id}".lower() + asset_suffix_desilt = f"Desilt_layer_{instance.name}_{instance.id}".lower() asset_id_desilt = ( get_gee_dir_path( asset_folder, asset_path=GEE_PATHS["WATERBODY"]["GEE_ASSET_PATH"] ) - + assst_suffix_desilt + + asset_suffix_desilt ) delete_asset_on_GEE(asset_id_desilt) + + # File paths project_id = instance.id org_name = instance.organization.name app_type = instance.app_type project_name = instance.name - filename = ( - f"{org_name}_{app_type}_{project_id}_{project_name}_{int(datetime.now().timestamp())}" - + ".csv" - ) + filename = f"{org_name}_{app_type}_{project_id}_{project_name}_{int(datetime.now().timestamp())}.csv" directory = f"{org_name}/{app_type}/{project_id}_{project_name}" full_path = os.path.join(SITE_DATA_PATH, directory) - file_path = full_path + filename + file_path = os.path.join(full_path, filename) os.makedirs(full_path, exist_ok=True) + + # Write CSV + csv_columns = [ + "desilt_id", + "latitude", + "longitude", + "desiltingpoint_lat", + "desiltingpoint_lon", + "Village", + "distance_from_desilting_point", + "name_of_ngo", + "State", + "District", + "Taluka", + "waterbody_name", + "slit_excavated", + "intervention_year", + ] + with open(file_path, "w", newline="") as csvfile: writer = csv.writer(csvfile) - writer.writerow( - [ - "desilt_id", - "latitude", - "longitude", - "desiltingpoint_lat", - "desiltingpoint_lon", - "Village", - "distance_from_desilting_point", - "name_of_ngo", - "State", - "District", - "Taluka", - "waterbody_name", - "slit_excavated", - "intervention_year", - ] - ) + writer.writerow(csv_columns) for loc in data: writer.writerow( [ - val if val is not None and str(val).strip() != "" else "N/A" - for val in [ - loc.id, - loc.closest_wb_lat, - loc.closest_wb_long, - loc.lat, - loc.lon, - loc.Village, - loc.distance_closest_wb_pixel, - loc.name_of_ngo, - loc.State, - loc.District, - loc.Taluka, - loc.waterbody_name, - loc.slit_excavated, - loc.intervention_year, - ] + loc.id, + loc.closest_wb_lat, + loc.closest_wb_long, + loc.lat, + loc.lon, + loc.Village, + loc.distance_closest_wb_pixel, + loc.name_of_ngo, + loc.State, + loc.District, + loc.Taluka, + loc.waterbody_name, + loc.slit_excavated, + loc.intervention_year, ] ) + + # Read CSV into DataFrame df = pd.read_csv(file_path) - df = df.fillna("N/A").replace(r"^\s*$", "N/A", regex=True) + + # --- FIX: handle numeric vs string columns --- + numeric_cols = [ + "latitude", + "longitude", + "desiltingpoint_lat", + "desiltingpoint_lon", + "distance_from_desilting_point", + "slit_excavated", + ] + for col in numeric_cols: + df[col] = pd.to_numeric( + df[col], errors="coerce" + ) # keeps numbers, NaN if invalid + + # Drop rows without geometry + df = df.dropna(subset=["latitude", "longitude"]) + + # Fill missing numeric values with 0 (optional) + df[numeric_cols] = df[numeric_cols].fillna(0) + + # Fill missing string columns with "N/A" + string_cols = [c for c in df.columns if c not in numeric_cols] + df[string_cols] = df[string_cols].fillna("N/A").replace(r"^\s*$", "N/A", regex=True) + + # Create GeoDataFrame geometry = [Point(xy) for xy in zip(df["longitude"], df["latitude"])] gdf = gpd.GeoDataFrame(df, geometry=geometry) - gdf.set_crs("EPSG:4326", allow_override=True, inplace=True) - gdf = gdf.dropna(subset=["geometry"]) + gdf.set_crs("EPSG:4326", inplace=True) + + # Convert to GEE FeatureCollection fc = gdf_to_ee_fc(gdf) + + # Delete previous asset if exists delete_asset_on_GEE(asset_id_desilt) - point_tasks = ee.batch.Export.table.toAsset( - collection=fc, description=assst_suffix_desilt, assetId=asset_id_desilt + + # Export to GEE + task = ee.batch.Export.table.toAsset( + collection=fc, description=asset_suffix_desilt, assetId=asset_id_desilt ) - point_tasks.start() - wait_for_task_completion(point_tasks) + task.start() + wait_for_task_completion(task) + + return {"status": "success", "asset_id": asset_id_desilt} def BuildMWSLayer( diff --git a/waterrejuvenation/tests.py b/waterrejuvenation/tests.py index 7ce503c2..d05661fb 100644 --- a/waterrejuvenation/tests.py +++ b/waterrejuvenation/tests.py @@ -1,3 +1,88 @@ +import io + +import pandas as pd +from django.core.files.uploadedfile import SimpleUploadedFile from django.test import TestCase +from organization.models import Organization +from projects.models import AppType, Project +from rest_framework.test import APITestCase + +from users.models import User +from waterrejuvenation.models import WaterbodiesFileUploadLog + +class WaterRejExcelUploadCreateApiTests(APITestCase): + def setUp(self): + self.user = User.objects.create_user( + username="test_user", + email="test_user@example.com", + password="test_password", + ) + self.org = Organization.objects.create(name="TestOrg") + self.user.organization = self.org + self.user.save(update_fields=["organization"]) + + self.project = Project.objects.create( + name="TestProject", + organization=self.org, + app_type=AppType.WATERBODY_REJ, + enabled=True, + created_by=self.user, + updated_by=self.user, + ) + + self.client.force_authenticate(user=self.user) + + def _make_valid_excel_bytes(self): + # Column names must match what's used in: + # - waterrejuvenation/utils.py -> EXPECTED_EXCEL_HEADERS (case-insensitive) + # - waterrejuvenation/tasks.py -> row.get("Name of NGO"), etc (exact casing) + df = pd.DataFrame( + [ + { + "Sr No.": 1, + "Name of NGO": "NGO A", + "State": "State A", + "District": "District A", + "Taluka": "Taluka A", + "Village": "Village A", + "Name of the waterbody ": "Waterbody A", + "Latitude": 19.123, + "Longitude": 73.567, + "Silt Excavated as per App": "10", + "Intervention_year": "2017", + } + ] + ) + + buf = io.BytesIO() + with pd.ExcelWriter(buf, engine="openpyxl") as writer: + df.to_excel(writer, index=False) + buf.seek(0) + return buf.read() + + def test_create_upload_single_file_force_regenerate_false(self): + excel_bytes = self._make_valid_excel_bytes() + uploaded = SimpleUploadedFile( + "waterrejuvenation_test.xlsx", + excel_bytes, + content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + ) + + url = f"/api/v1/projects/{self.project.id}/waterrejuvenation/excel/" + payload = { + "file": uploaded, + "gee_account_id": 1, + "is_lulc_required": True, + "is_processing_required": True, + "is_closest_wp": True, + "is_compute": False, # do not trigger Celery in unit test + "force_regenerate": False, + } + + resp = self.client.post(url, data=payload, format="multipart") + self.assertEqual(resp.status_code, 201, resp.content) -# Create your tests here. + self.assertEqual(resp.data["files_created"], 1) + self.assertTrue( + WaterbodiesFileUploadLog.objects.filter(project=self.project).exists() + ) diff --git a/waterrejuvenation/views.py b/waterrejuvenation/views.py index aed9559d..1ea308ab 100644 --- a/waterrejuvenation/views.py +++ b/waterrejuvenation/views.py @@ -48,6 +48,7 @@ def create(self, request, *args, **kwargs): print("inside create api") print(request.data) """Create new excel files - supports both single and multiple file uploads""" + project_id = self.kwargs.get("project_pk") print("Project: " + str(project_id)) if not project_id: @@ -56,63 +57,83 @@ def create(self, request, *args, **kwargs): status=status.HTTP_400_BAD_REQUEST, ) - # Get project and check if it's a plantation project and enabled + # Get project try: project = Project.objects.get(id=project_id, app_type=AppType.WATERBODY_REJ) except Project.DoesNotExist: return Response( - {"detail": "Plantation project not found or not enabled."}, + {"detail": "Project not found or not enabled."}, status=status.HTTP_400_BAD_REQUEST, ) - # Check if we have files in the request + # Determine if re-upload is allowed + allow_reupload = request.data.get("allow_reupload", True) + # Default behavior: + # - force_regenerate=True (default): delete all points and recreate + # - force_regenerate=False: keep existing points and add only new ones + def parse_bool(val, default=True): + if val is None: + return default + if isinstance(val, bool): + return val + if isinstance(val, (int, float)): + return bool(val) + if isinstance(val, str): + return val.strip().lower() in ("true", "1", "yes", "y") + return default + + # Get uploaded files files = [] - print(request.FILES) - # Handle single file upload case if "file" in request.FILES: files.append(request.FILES["file"]) - # Handle multiple files upload case if "files" in request.FILES: - print("Found multiple files with 'files[]'") - file_list = request.FILES.getlist("files") - print(f"Number of files in 'files[]': {len(file_list)}") - for f in file_list: - print(f" - {f.name}") - files.extend(file_list) - print(files) + files.extend(request.FILES.getlist("files")) + if not files: return Response( {"detail": "No files were uploaded."}, status=status.HTTP_400_BAD_REQUEST, ) - # Process each file created_files = [] errors = [] + force_regenerate = parse_bool( + request.data.get("force_regenerate", request.data.get("force_regeneration", True)), + default=True, + ) for uploaded_file in files: # Validate file extension if not uploaded_file.name.lower().endswith(".xlsx"): errors.append( - f"File '{uploaded_file.name}' is not a KML file. Only KML files are allowed." + f"File '{uploaded_file.name}' is not an Excel file. Only .xlsx allowed." ) continue - # Calculate file hash to check for duplicates + # Calculate file hash uploaded_file.seek(0) file_hash = hashlib.md5() for chunk in uploaded_file.chunks(): file_hash.update(chunk) excel_hash = file_hash.hexdigest() - print(excel_hash) - # Check if file with same hash already exists - if WaterbodiesFileUploadLog.objects.filter( + print(f"Hash for {uploaded_file.name}: {excel_hash}") + + # Check for duplicates + existing_file = WaterbodiesFileUploadLog.objects.filter( project=project, excel_hash=excel_hash - ).exists(): - errors.append(f"File '{uploaded_file.name}' has already been uploaded.") - continue + ).first() + if existing_file: + if allow_reupload: + # Delete old file to allow re-upload + existing_file.delete() + print(f"Deleted previous upload for {uploaded_file.name}") + else: + errors.append( + f"File '{uploaded_file.name}' has already been uploaded." + ) + continue - # Prepare data for serializer + # Prepare serializer data data = { "name": request.data.get("name", valid_gee_text(uploaded_file.name)), "file": uploaded_file, @@ -125,44 +146,44 @@ def create(self, request, *args, **kwargs): "is_closest_wp": request.data.get("is_closest_wp", True), "is_compute": request.data.get("is_compute", False), } - print(data) serializer = self.get_serializer(data=data) try: uploaded_file.seek(0) - is_valid, error_msg = validate_excel_headers( uploaded_file, EXPECTED_EXCEL_HEADERS ) - if not is_valid: errors.append( f"File '{uploaded_file.name}' format error: {error_msg}" ) continue - # VERY IMPORTANT uploaded_file.seek(0) if serializer.is_valid(): - print("inside serailizer sv") - # Save excel file excel_file = serializer.save( project=project, uploaded_by=request.user, excel_hash=excel_hash, ) - - # # Convert KML to GeoJSON - # file_path = kml_file.file.path - # geojson_data = convert_kml_to_geojson(file_path) - # - # if geojson_data: - # # Update GeoJSON data in the model - # kml_file.geojson_data = geojson_data - # kml_file.save(update_fields=["geojson_data"]) - created_files.append(serializer.data) - print(created_files) + + # Celery trigger is handled here so we can pass `force_regenerate` + # without requiring a DB column. + if excel_file.is_compute: + from .tasks import Upload_Desilting_Points + + Upload_Desilting_Points.apply_async( + kwargs={ + "file_obj_id": excel_file.id, + "gee_account_id": excel_file.gee_account_id, + "is_lulc_required": excel_file.is_lulc_required, + "is_processing_required": excel_file.is_processing_required, + "is_closest_wp": excel_file.is_closest_wp, + "is_force_regeneration": force_regenerate, + }, + queue="waterbody1", + ) else: errors.append( f"Error validating file '{uploaded_file.name}': {serializer.errors}" @@ -171,21 +192,14 @@ def create(self, request, *args, **kwargs): errors.append(f"Error saving file '{uploaded_file.name}': {str(e)}") continue - # # Update the merged GeoJSON file for the project if any files were created - # if created_files: - # self.update_project_geojson(project) - # Prepare response response_data = {"files_created": len(created_files), "files": created_files} - if errors: response_data["errors"] = errors - # Return 201 if at least one file was created, otherwise 400 status_code = ( status.HTTP_201_CREATED if created_files else status.HTTP_400_BAD_REQUEST ) - return Response(response_data, status=status_code) From 0c9642729b6a64e95c6680c16ed3addf1d2df31d Mon Sep 17 00:00:00 2001 From: "kapil.dadheech@gramvaani.org" Date: Wed, 1 Apr 2026 11:49:29 +0530 Subject: [PATCH 2/2] Fix lulc size issue --- computing/lulc/lulc_v3.py | 90 +++++++--- computing/zoi_layers/zoi3.py | 122 ++++++++------ utilities/gee_utils.py | 28 ++++ waterrejuvenation/tasks.py | 310 +++++++++++++++++++++++++++++++---- 4 files changed, 446 insertions(+), 104 deletions(-) diff --git a/computing/lulc/lulc_v3.py b/computing/lulc/lulc_v3.py index 36af459d..50e7a8e2 100644 --- a/computing/lulc/lulc_v3.py +++ b/computing/lulc/lulc_v3.py @@ -1,6 +1,7 @@ import datetime import time from datetime import timedelta +import re import ee from dateutil.relativedelta import relativedelta @@ -17,6 +18,7 @@ is_gee_asset_exists, get_gee_dir_path, gcs_file_exists, + delete_gcs_raster_files, ) from nrm_app.celery import app from .cropping_frequency import * @@ -29,6 +31,30 @@ from computing.STAC_specs import generate_STAC_layerwise +def _parse_lulc_filename_for_years(final_output_filename: str): + """ + final_output_filename format (created in clip_lulc_v3): + _--_--_LULCmap_ + + We use regex instead of split('_20') because the prefix may contain substrings like '_205' + which break naive year parsing (e.g. producing LULC_5_24_...). + """ + # Capture prefix + the two YYYY values around the date segments. + m = re.match( + r"^(?P.*)_(?P\d{4})-\d{2}-\d{2}_(?P\d{4})-\d{2}-\d{2}_", + final_output_filename, + ) + if not m: + raise ValueError( + f"Unexpected LULC filename format: '{final_output_filename}'. " + "Expected ___LULCmap_." + ) + prefix = m.group("prefix") + s_year = m.group("start")[2:] + e_year = m.group("end")[2:] + return prefix, s_year, e_year + + @app.task(bind=True) def clip_lulc_v3( self, @@ -42,6 +68,7 @@ def clip_lulc_v3( asset_folder=None, asset_suffix=None, app_type="MWS", + force_regenerate=False, ): ee_initialize(gee_account_id) print("Inside lulc_river_basin") @@ -76,16 +103,17 @@ def clip_lulc_v3( final_output_assetid_array_new = [] layer_obj = None - try: - layer_obj = get_layer_object( - state, - district, - block, - layer_name=f"LULC_17_18_{filename_prefix}_level_3", - dataset_name="LULC_level_3", - ) - except Exception as e: - print("DB layer not found for lulc.") + if not force_regenerate: + try: + layer_obj = get_layer_object( + state, + district, + block, + layer_name=f"LULC_17_18_{filename_prefix}_level_3", + dataset_name="LULC_level_3", + ) + except Exception as e: + print("DB layer not found for lulc.") new_loop_start = None if layer_obj: @@ -124,13 +152,21 @@ def clip_lulc_v3( pan_india = ee.Image( f"projects/corestack-datasets/assets/datasets/LULC_v3_river_basin/pan_india_lulc_v3_{curr_start_year}_{curr_end_year}" ) - clipped_lulc = pan_india.clipToCollection(roi.geometry()) + # clipToCollection expects a FeatureCollection; we have geometry here. + clipped_lulc = pan_india.clip(roi.geometry()) l1_asset_new.append(clipped_lulc) task_list = [] geometry = roi.geometry() - if not is_gee_asset_exists(final_output_assetid_array_new[len(l1_asset_new) - 1]): + if force_regenerate or not is_gee_asset_exists( + final_output_assetid_array_new[len(l1_asset_new) - 1] + ): for i in range(0, len(l1_asset_new)): + if force_regenerate and is_gee_asset_exists(final_output_assetid_array_new[i]): + try: + ee.data.deleteAsset(final_output_assetid_array_new[i]) + except Exception as e: + print("Error deleting existing LULC asset during force regenerate:", e) if ( is_gee_asset_exists(final_output_assetid_array_new[i]) and len(l1_asset_new) <= 2 @@ -191,6 +227,7 @@ def clip_lulc_v3( final_output_filename_array_new, final_output_assetid_array_new, scale, + force_regenerate=force_regenerate, ) layer_at_geoserver = sync_lulc_to_geoserver( @@ -206,17 +243,22 @@ def clip_lulc_v3( def sync_lulc_to_gcs( - final_output_filename_array_new, final_output_assetid_array_new, scale + final_output_filename_array_new, + final_output_assetid_array_new, + scale, + force_regenerate=False, ): task_ids = [] for i in range(0, len(final_output_assetid_array_new)): make_asset_public(final_output_assetid_array_new[i]) image = ee.Image(final_output_assetid_array_new[i]) - name_arr = final_output_filename_array_new[i].split("_20") - s_year = name_arr[1][:2] - e_year = name_arr[2][:2] - layer_name = "LULC_" + s_year + "_" + e_year + "_" + name_arr[0] - if not gcs_file_exists(layer_name): + prefix, s_year, e_year = _parse_lulc_filename_for_years( + final_output_filename_array_new[i] + ) + layer_name = "LULC_" + s_year + "_" + e_year + "_" + prefix + if force_regenerate: + delete_gcs_raster_files(layer_name) + if force_regenerate or not gcs_file_exists(layer_name): task_ids.append(sync_raster_to_gcs(image, scale, layer_name)) task_id_list = check_task_status(task_ids) @@ -235,12 +277,10 @@ def sync_lulc_to_geoserver( lulc_workspaces = ["LULC_level_1", "LULC_level_2", "LULC_level_3"] layer_at_geoserver = False for i in range(0, len(final_output_filename_array_new)): - name_arr = final_output_filename_array_new[i].split( - "_20" - ) # TODO: better logic than this - s_year = name_arr[1][:2] - e_year = name_arr[2][:2] - gcs_file_name = "LULC_" + s_year + "_" + e_year + "_" + name_arr[0] + prefix, s_year, e_year = _parse_lulc_filename_for_years( + final_output_filename_array_new[i] + ) + gcs_file_name = "LULC_" + s_year + "_" + e_year + "_" + prefix print("Syncing " + gcs_file_name + " to geoserver") for workspace in lulc_workspaces: suff = workspace.replace("LULC", "") @@ -265,7 +305,7 @@ def sync_lulc_to_geoserver( ) if res and layer_ids: update_layer_sync_status(layer_id=layer_ids[i], sync_to_geoserver=True) - print("STAC: Name array check", name_arr[1]) + print("STAC: Parsed start year", s_year) if workspace == "LULC_level_3": start_year_STAC = "20" + str( diff --git a/computing/zoi_layers/zoi3.py b/computing/zoi_layers/zoi3.py index d3b99503..01d69bc7 100644 --- a/computing/zoi_layers/zoi3.py +++ b/computing/zoi_layers/zoi3.py @@ -70,59 +70,79 @@ def get_ndvi_for_zoi( + description_ndvi ) - zoi_collections = ee.FeatureCollection(asset_id_zoi) - total_features = zoi_collections.size().getInfo() - print(f"Total ZOI features for NDVI: {total_features}") - - # For larger feature collections, split into chunks and merge chunk assets. - # This avoids export failures for heavy NDVI computations. - chunk_size = 40 - if total_features > chunk_size: - print(f"Running chunked NDVI generation with chunk size {chunk_size}") - chunk_assets = [] - chunk_merge_task_ids = [] - chunks = _build_fc_chunks(zoi_collections, chunk_size) - - for part, chunk_fc in chunks: - chunk_suffix = f"{description_ndvi}_chunk_{part}" - chunk_asset_path = f"{ndvi_asset_path}_chunk_{part}" - - if is_gee_asset_exists(chunk_asset_path): - ee.data.deleteAsset(chunk_asset_path) - - chunk_result_fc = get_ndvi_data( - chunk_fc, 2017, 2024, chunk_suffix, chunk_asset_path - ) - task_id = export_vector_asset_to_gee( - chunk_result_fc, chunk_suffix, chunk_asset_path - ) - if task_id: - chunk_merge_task_ids.append(task_id) - chunk_assets.append(chunk_asset_path) - - if chunk_merge_task_ids: - check_task_status(chunk_merge_task_ids) - - if not chunk_assets: - raise Exception("No NDVI chunk assets were generated for merge.") - - merged_fc = ee.FeatureCollection(chunk_assets).flatten() - if is_gee_asset_exists(ndvi_asset_path): - ee.data.deleteAsset(ndvi_asset_path) - final_task = ee.batch.Export.table.toAsset( - collection=merged_fc, description=description_ndvi, assetId=ndvi_asset_path - ) - final_task.start() - wait_for_task_completion(final_task) + # If the final NDVI asset already exists, do not regenerate chunks. + if is_gee_asset_exists(ndvi_asset_path): fc = ee.FeatureCollection(ndvi_asset_path) else: - fc = get_ndvi_data(zoi_collections, 2017, 2024, description_ndvi, ndvi_asset_path) - task = ee.batch.Export.table.toAsset( - collection=fc, description=description_ndvi, assetId=ndvi_asset_path - ) - task.start() - wait_for_task_completion(task) - fc = ee.FeatureCollection(ndvi_asset_path) + zoi_collections = ee.FeatureCollection(asset_id_zoi) + total_features = zoi_collections.size().getInfo() + print(f"Total ZOI features for NDVI: {total_features}") + + # For larger feature collections, split into chunks and merge chunk assets. + # This avoids export failures for heavy NDVI computations. + chunk_size = 40 + if total_features > chunk_size: + print(f"Running chunked NDVI generation with chunk size {chunk_size}") + chunk_assets = [] + chunk_merge_task_ids = [] + chunks = _build_fc_chunks(zoi_collections, chunk_size) + + for part, chunk_fc in chunks: + chunk_suffix = f"{description_ndvi}_chunk_{part}" + chunk_asset_path = f"{ndvi_asset_path}_chunk_{part}" + + # Do not regenerate an existing chunk asset. + if is_gee_asset_exists(chunk_asset_path): + chunk_assets.append(chunk_asset_path) + continue + + chunk_result_fc = get_ndvi_data( + chunk_fc, 2017, 2024, chunk_suffix, chunk_asset_path + ) + task_id = export_vector_asset_to_gee( + chunk_result_fc, chunk_suffix, chunk_asset_path + ) + if task_id: + chunk_merge_task_ids.append(task_id) + chunk_assets.append(chunk_asset_path) + + if chunk_merge_task_ids: + check_task_status(chunk_merge_task_ids) + + if not chunk_assets: + raise Exception("No NDVI chunk assets were generated for merge.") + + # Load each chunk asset as a FeatureCollection, then merge them. + merged_fc = None + for asset_id in chunk_assets: + chunk_fc = ee.FeatureCollection(asset_id) + merged_fc = ( + chunk_fc if merged_fc is None else merged_fc.merge(chunk_fc) + ) + + # Export the merged NDVI asset. + final_task = ee.batch.Export.table.toAsset( + collection=merged_fc, + description=description_ndvi, + assetId=ndvi_asset_path, + ) + final_task.start() + wait_for_task_completion(final_task) + fc = ee.FeatureCollection(ndvi_asset_path) + else: + fc = get_ndvi_data( + zoi_collections, + 2017, + 2024, + description_ndvi, + ndvi_asset_path, + ) + task = ee.batch.Export.table.toAsset( + collection=fc, description=description_ndvi, assetId=ndvi_asset_path + ) + task.start() + wait_for_task_completion(task) + fc = ee.FeatureCollection(ndvi_asset_path) start_date = "30-06-2017" end_date = "01-07-2024" diff --git a/utilities/gee_utils.py b/utilities/gee_utils.py index 273fa052..fbb98300 100644 --- a/utilities/gee_utils.py +++ b/utilities/gee_utils.py @@ -396,13 +396,22 @@ def is_asset_public(asset_id): def sync_raster_to_gcs(image, scale, layer_name): print("inside sync_raster_to_gcs") + # Optimize large/sparse exports: + # - region limits export to image footprint + # - skipEmptyTiles avoids writing empty tiles for disjoint ROI + # - cloudOptimized produces GeoTIFFs better suited for serving/upload export_task = ee.batch.Export.image.toCloudStorage( image=image, description="gcs_" + layer_name, bucket=GCS_BUCKET_NAME, fileNamePrefix="nrm_raster/" + layer_name, scale=scale, + region=image.geometry(), fileFormat="GeoTIFF", + formatOptions={"cloudOptimized": True}, + skipEmptyTiles=True, + fileDimensions=4096, + shardSize=256, crs="EPSG:4326", maxPixels=1e13, ) @@ -580,6 +589,25 @@ def gcs_file_exists(layer_name): return False +def delete_gcs_raster_files(layer_name): + """ + Delete existing raster objects for a layer from GCS. + Handles both: + - nrm_raster/.tif + - multipart outputs with prefix nrm_raster/ + """ + bucket = gcs_config() + prefix = f"nrm_raster/{layer_name}" + deleted = 0 + for blob in bucket.list_blobs(prefix=prefix): + try: + blob.delete() + deleted += 1 + except Exception as e: + print(f"Failed deleting GCS blob {blob.name}: {e}") + print(f"Deleted {deleted} GCS blob(s) for layer {layer_name}") + + def upload_tif_from_gcs_to_gee(gcs_path, asset_id, scale): # Read the image image = ee.Image.loadGeoTIFF(gcs_path) diff --git a/waterrejuvenation/tasks.py b/waterrejuvenation/tasks.py index dcc12358..e490486b 100644 --- a/waterrejuvenation/tasks.py +++ b/waterrejuvenation/tasks.py @@ -31,6 +31,7 @@ gdf_to_ee_fc, get_gee_dir_path, make_asset_public, + is_gee_asset_exists, valid_gee_text, ) import ee @@ -77,14 +78,77 @@ def Upload_Desilting_Points( is_force_regeneration=True, ): import pandas as pd + import requests + from django.conf import settings as django_settings from .models import WaterbodiesFileUploadLog, WaterbodiesDesiltingLog - def normalize(val): + def normalize_str(val, default="unknown"): + """Normalize Excel string-ish fields. + + - Blank/NaN -> "unknown" + - Non-blank -> trimmed string + """ + if pd.isna(val): + return default + if isinstance(val, str): + val = val.strip() + return val if val else default + return str(val) if val is not None else default + + def normalize_float(val): + """Normalize Excel numeric fields (lat/lon). Blank/NaN -> None.""" if pd.isna(val): return None if isinstance(val, str) and val.strip() == "": return None - return val + try: + return float(val) + except (TypeError, ValueError): + return None + + def parse_admin_details_by_latlon(lat, lon): + """Call GeoServer API to fetch State/District/Tehsil for a lat/lon.""" + # Cache to avoid repeated calls for identical coordinates. + cache_key = (round(lat, 4), round(lon, 4)) + if cache_key in admin_cache: + return admin_cache[cache_key] + + api_key = os.getenv( + "ADMIN_DETAILS_BY_LATLON_API_KEY", + os.getenv( + "GET_ADMIN_DETAILS_BY_LATLON_API_KEY", + os.getenv("GEO_SERVER_PUBLIC_API_KEY") or "", + ), + ) + if not api_key: + admin_cache[cache_key] = {"State": "unknown", "District": "unknown", "Tehsil": "unknown"} + return admin_cache[cache_key] + + url = f"{django_settings.BASE_URL}api/v1/get_admin_details_by_latlon/" + headers = {"X-API-Key": api_key} + params = {"latitude": lat, "longitude": lon} + try: + resp = requests.get(url, params=params, headers=headers, timeout=30) + if resp.status_code != 200: + admin_cache[cache_key] = {"State": "unknown", "District": "unknown", "Tehsil": "unknown"} + return admin_cache[cache_key] + + data = resp.json() if resp.content else {} + result = { + "State": data.get("State") or "unknown", + "District": data.get("District") or "unknown", + "Tehsil": data.get("Tehsil") or "unknown", + } + admin_cache[cache_key] = result + return result + except Exception as e: + logger.warning(f"Admin lookup failed for ({lat}, {lon}): {e}") + admin_cache[cache_key] = { + "State": "unknown", + "District": "unknown", + "Tehsil": "unknown", + } + return admin_cache[cache_key] ee_initialize(gee_account_id) @@ -100,6 +164,7 @@ def normalize(val): df = pd.read_excel(wb_obj.file) merged_features = [] + admin_cache = {} for index, row in df.iterrows(): print(row) @@ -107,16 +172,18 @@ def normalize(val): # Create DB row FIRST (lossless) # ----------------------------- dsilting_obj_log = WaterbodiesDesiltingLog.objects.create( - name_of_ngo=normalize(row.get("Name of NGO")), - State=normalize(row.get("State")), - District=normalize(row.get("District")), - Taluka=normalize(row.get("Taluka")), - Village=normalize(row.get("Village")), - waterbody_name=normalize(row.get("Name of the waterbody ")), - lat=normalize(row.get("Latitude")), - lon=normalize(row.get("Longitude")), - slit_excavated=normalize(row.get("Silt Excavated as per App")), - intervention_year=normalize(row.get("Intervention_year")), + # For user-facing text fields: blank -> "unknown" + name_of_ngo=normalize_str(row.get("Name of NGO")), + State=normalize_str(row.get("State")), + District=normalize_str(row.get("District")), + Taluka=normalize_str(row.get("Taluka")), + Village=normalize_str(row.get("Village")), + waterbody_name=normalize_str(row.get("Name of the waterbody ")), + # For numeric fields: blank -> None (so we can validate/skip) + lat=normalize_float(row.get("Latitude")), + lon=normalize_float(row.get("Longitude")), + slit_excavated=normalize_str(row.get("Silt Excavated as per App")), + intervention_year=normalize_str(row.get("Intervention_year")), excel_hash=wb_obj.excel_hash, project=proj_obj, process=False, @@ -179,6 +246,20 @@ def normalize(val): dsilting_obj_log.distance_closest_wb_pixel = distance dsilting_obj_log.process = True dsilting_obj_log.failure_reason = None + + # If admin fields were blank in Excel, fill using GeoServer API. + # We use the nearest water pixel coords to ensure consistency. + if ( + dsilting_obj_log.State == "unknown" + or dsilting_obj_log.District == "unknown" + or dsilting_obj_log.Taluka == "unknown" + ): + admin = parse_admin_details_by_latlon(closest_lat, closest_lon) + dsilting_obj_log.State = admin.get("State") or "unknown" + dsilting_obj_log.District = admin.get("District") or "unknown" + # API returns "Tehsil"; DB column is "Taluka" + dsilting_obj_log.Taluka = admin.get("Tehsil") or "unknown" + dsilting_obj_log.save() try: @@ -250,6 +331,7 @@ def Generate_lulc_mws( asset_folder=asset_folder, asset_suffix=f"{proj_obj.name}_{proj_obj.id}".lower(), app_type="WATERBODY", + force_regenerate=True, ) logger.info("luc Task finished for lulc") except Exception as e: @@ -783,24 +865,194 @@ def remove_match(_): # ------------------------- # Export merged FC to GEE asset # ------------------------- - delete_asset_on_GEE(asset_id_wb_mws) - task = ee.batch.Export.table.toAsset( - collection=merged_fc, - description=asset_suffix_wb, - assetId=asset_id_wb_mws, - ) + # Earth Engine can reject very large table exports with: + # "Request payload size exceeds the limit" (10MB payload limit). + # Strategy: + # 1) Try exporting the full table. + # 2) If it fails with that specific payload-size error, retry using chunk exports. + # IMPORTANT: + # Calling merged_fc.size().getInfo() can itself fail with the same + # "Request payload size exceeds the limit" error for very large tables. + # We therefore reuse `final_count` computed earlier for the base table, + # and if it's missing we will assume an upper bound only when retrying. + merged_count = final_count + + # Aggressive fallback chunk size to avoid EE 10MB payload limit. + # Slower, but safest for very heavy per-feature payloads. + chunk_size = 1 + chunk_asset_ids = [] + + def _export_table(collection, description, asset_id): + """ + Start an EE table export and recover once from request_id collision. + """ + try_descriptions = [description, f"{description}_{int(time.time())}"] + last_err = None + for idx, desc in enumerate(try_descriptions): + try: + export_task = ee.batch.Export.table.toAsset( + collection=collection, + description=desc, + assetId=asset_id, + ) + export_task.start() + return export_task + except ee.EEException as err: + last_err = err + if ( + "A different Operation was already started with the given request_id" + in str(err) + and idx < len(try_descriptions) - 1 + ): + # Retry immediately with a unique description so EE creates + # a fresh operation/request context. + time.sleep(2) + continue + raise + raise last_err + + export_task = None + try: + if is_gee_asset_exists(asset_id_wb_mws): + delete_asset_on_GEE(asset_id_wb_mws) + export_task = _export_table(merged_fc, asset_suffix_wb, asset_id_wb_mws) + logger.info( + "Started export task %s -> %s", export_task.id, asset_id_wb_mws + ) + wait_for_task_completion(export_task) + merged_fc = ee.FeatureCollection(asset_id_wb_mws) + except ee.EEException as ee_err: + msg = str(ee_err) + if "Request payload size exceeds the limit" not in msg: + raise + + # Retry with chunk exports + logger.warning( + "Export payload too large. Retrying with chunk exports (chunk_size=%s). Error=%s", + chunk_size, + msg, + ) + if merged_count is None: + # Fallback: pick a conservative upper bound to avoid calling + # merged_fc.size().getInfo() (which triggers payload errors). + assumed_max_features = chunk_size * 50 # 1000 features + merged_count = assumed_max_features + logger.warning( + "merged_count unknown; assuming max features=%s (chunk_size=%s, chunks=%s).", + merged_count, + chunk_size, + 50, + ) + + # Export each chunk (reuse existing chunks if present). + # Use local GDF chunking so each chunk builds a smaller EE graph. + local_chunk_size = 50 + for start in range(0, len(gdf), local_chunk_size): + end = min(start + local_chunk_size, len(gdf)) + chunk_asset_suffix = f"{asset_suffix_wb}_chunk_{start}-{end}" + chunk_asset_id = ( + get_gee_dir_path( + asset_folder, + asset_path=GEE_PATHS["WATERBODY"]["GEE_ASSET_PATH"], + ) + + chunk_asset_suffix + ) - task.start() - logger.info("Started export task %s -> %s", task.id, asset_id_wb_mws) + if is_gee_asset_exists(chunk_asset_id): + chunk_asset_ids.append(chunk_asset_id) + continue - # Wait for completion (uses your helper) - wait_for_task_completion(task) + chunk_geojson_op = f"{mws_geojson_op}_chunk_{start}_{end}.geojson" + chunk_gdf = gdf.iloc[start:end].copy() + chunk_gdf.to_file(chunk_geojson_op, driver="GeoJSON") - # After export, optionally refresh or get info - try: - exported_count = ee.FeatureCollection(asset_id_wb_mws).size().getInfo() - except Exception: - exported_count = None + primary_chunk_fc = ee.FeatureCollection( + calculate_precipitation_season( + chunk_geojson_op, draught_asset_id=draught_asset_id + ) + ) + + if drought_count == 0: + chunk_fc = primary_chunk_fc.map( + lambda f: ee.Feature(f).select( + ee.List(ee.Feature(f).propertyNames()) + ) + ) + else: + join = ee.Join.saveFirst("match") + ffilter = ee.Filter.equals(leftField="uid", rightField="uid") + joined = join.apply( + primary=primary_chunk_fc, + secondary=drought_fc, + condition=ffilter, + ) + + def _flatten_match(feat): + feat = ee.Feature(feat) + + def copy_props(_): + match = ee.Feature(feat.get("match")) + match_props = match.propertyNames() + + def _setter(prop, acc): + acc = ee.Feature(acc) + prop = ee.String(prop) + val = match.get(prop) + new_name = ee.String("drought_").cat(prop) + return acc.set(new_name, val) + + merged = ee.Feature(match_props.iterate(_setter, feat)) + merged = ee.Feature(merged).select( + ee.List(merged.propertyNames()).remove("match") + ) + return merged + + def remove_match(_): + return ee.Feature(feat).select( + ee.List(feat.propertyNames()).remove("match") + ) + + result = ee.Algorithms.If( + feat.get("match"), copy_props(None), remove_match(None) + ) + return ee.Feature(result) + + chunk_fc = ee.FeatureCollection(joined.map(_flatten_match)) + + delete_asset_on_GEE(chunk_asset_id) + chunk_task = _export_table(chunk_fc, chunk_asset_suffix, chunk_asset_id) + logger.info( + "Started chunk export %s -> %s", + chunk_task.id, + chunk_asset_id, + ) + wait_for_task_completion(chunk_task) + chunk_asset_ids.append(chunk_asset_id) + + # Merge chunk assets back into a single final asset. + merged_fc_final = None + for asset_id in chunk_asset_ids: + part_fc = ee.FeatureCollection(asset_id) + merged_fc_final = ( + part_fc + if merged_fc_final is None + else merged_fc_final.merge(part_fc) + ) + + if is_gee_asset_exists(asset_id_wb_mws): + delete_asset_on_GEE(asset_id_wb_mws) + export_task = _export_table( + merged_fc_final, asset_suffix_wb, asset_id_wb_mws + ) + logger.info( + "Started final chunk-merged export -> %s", asset_id_wb_mws + ) + wait_for_task_completion(export_task) + merged_fc = ee.FeatureCollection(asset_id_wb_mws) + + # Avoid calling .size().getInfo() here; it can fail with the same + # payload-size limitations for very large tables. + exported_count = None # ------------------------- # Push to GeoServer @@ -818,7 +1070,9 @@ def remove_match(_): return { "status": "SUCCESS", "asset_id": asset_id_wb_mws, - "export_task_id": task.id if hasattr(task, "id") else None, + "export_task_id": ( + export_task.id if export_task and hasattr(export_task, "id") else None + ), "feature_count": exported_count, "message": f"Exported and synced layer {layer_name}", }